Enable Mesos HTTP API. This patch completes the design doc[1] and enables operators to choose between two V1 Mesos API implementations. The first is `V0Mesos` which offers the V1 API backed by the scheduler driver and the second is `V1Mesos` which offers the V1 API backed by a new HTTP API implementation.
There are three sets of changes in this patch. First, the V1 Mesos code requires a Scheduler callback with a different API. To maximize code reuse, event handling logic was extracted into a `MesosCallbackHandler` class. `VersionedMesosSchedulerImpl` was created to implement the new callback interface. Both callbacks new use the handler class for logic. Second, a new driver implementation using the new API was created. All of the logic for the new driver is encapsulated in the `VersionedSchedulerDriverService` class. Third, some wiring changes were done to allow for Guice to do it's work and allow for operators to select between the different driver implementations. [1] https://docs.google.com/document/d/1bWK8ldaQSsRXvdKwTh8tyR_0qMxAlnMW70eOKoU3myo Testing Done: The e2e test has been run three times, each time with a different driver option. Bugs closed: AURORA-1887, AURORA-1888 Reviewed at https://reviews.apache.org/r/57061/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/705dbc7c Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/705dbc7c Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/705dbc7c Branch: refs/heads/master Commit: 705dbc7cd7c3ff477bcf766cdafe49a68ab47dee Parents: 2652fe0 Author: Zameer Manji <[email protected]> Authored: Thu Mar 2 15:07:11 2017 -0800 Committer: Zameer Manji <[email protected]> Committed: Thu Mar 2 15:07:11 2017 -0800 ---------------------------------------------------------------------- RELEASE-NOTES.md | 7 + examples/vagrant/upstart/aurora-scheduler.conf | 5 +- .../aurora/benchmark/StatusUpdateBenchmark.java | 6 +- .../apache/aurora/scheduler/app/AppModule.java | 12 +- .../aurora/scheduler/app/SchedulerMain.java | 22 +- .../scheduler/mesos/LibMesosLoadingModule.java | 29 +- .../scheduler/mesos/MesosCallbackHandler.java | 288 +++++++++++++ .../scheduler/mesos/MesosSchedulerImpl.java | 212 +-------- .../scheduler/mesos/ProtosConversion.java | 28 ++ .../scheduler/mesos/SchedulerDriverModule.java | 50 ++- .../scheduler/mesos/VersionedDriverFactory.java | 32 ++ .../mesos/VersionedMesosSchedulerImpl.java | 198 +++++++++ .../mesos/VersionedSchedulerDriverService.java | 254 +++++++++++ .../aurora/scheduler/app/SchedulerIT.java | 7 +- .../mesos/MesosCallbackHandlerTest.java | 430 +++++++++++++++++++ .../scheduler/mesos/MesosSchedulerImplTest.java | 424 ++++-------------- .../mesos/VersionedMesosSchedulerImplTest.java | 275 ++++++++++++ .../VersionedSchedulerDriverServiceTest.java | 194 +++++++++ .../aurora/scheduler/thrift/ThriftIT.java | 3 +- 19 files changed, 1907 insertions(+), 569 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/RELEASE-NOTES.md ---------------------------------------------------------------------- diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md index 2391d32..1142f75 100644 --- a/RELEASE-NOTES.md +++ b/RELEASE-NOTES.md @@ -6,6 +6,13 @@ - Add message parameter to `killTasks` RPC. - Add prune_tasks endpoint to aurora_admin. See aurora_admin prune_tasks -h for usage information. - Add support for per-task volume mounts for Mesos containers to the Aurora config DSL. +* Added the `-mesos_driver` flag to the scheduler with three possible options: + `SCHEDULER_DRIVER`, `V0_MESOS`, `V1_MESOS`. The first uses the original driver + and the latter two use two new drivers from `libmesos`. `V0_MESOS` uses the + `SCHEDULER_DRIVER` under the hood and `V1_MESOS` uses a new HTTP API aware + driver. Users that want to use the HTTP API should use `V1_MESOS`. + Performance sensitive users should stick with the `SCHEDULER_DRIVER` or + `V0_MESOS` drivers. 0.17.0 ====== http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/examples/vagrant/upstart/aurora-scheduler.conf ---------------------------------------------------------------------- diff --git a/examples/vagrant/upstart/aurora-scheduler.conf b/examples/vagrant/upstart/aurora-scheduler.conf index 49fdcbd..31fa036 100644 --- a/examples/vagrant/upstart/aurora-scheduler.conf +++ b/examples/vagrant/upstart/aurora-scheduler.conf @@ -16,7 +16,7 @@ respawn post-stop exec sleep 5 # Environment variables control the behavior of the Mesos scheduler driver (libmesos). -env GLOG_v=0 +env GLOG_v=1 env LIBPROCESS_PORT=8083 env LIBPROCESS_IP=192.168.33.7 env DIST_DIR=/home/vagrant/aurora/dist @@ -54,4 +54,5 @@ exec bin/aurora-scheduler \ -enable_revocable_ram=true \ -allow_gpu_resource=true \ -allow_container_volumes=true \ - -offer_filter_duration=0secs + -offer_filter_duration=0secs \ + -mesos_driver=V1_DRIVER http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java b/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java index 6c2bf46..95496c1 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java +++ b/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java @@ -61,6 +61,8 @@ import org.apache.aurora.scheduler.filter.SchedulingFilterImpl; import org.apache.aurora.scheduler.mesos.Driver; import org.apache.aurora.scheduler.mesos.DriverFactory; import org.apache.aurora.scheduler.mesos.DriverSettings; +import org.apache.aurora.scheduler.mesos.MesosCallbackHandler; +import org.apache.aurora.scheduler.mesos.MesosCallbackHandler.MesosCallbackHandlerImpl; import org.apache.aurora.scheduler.mesos.MesosSchedulerImpl; import org.apache.aurora.scheduler.mesos.ProtosConversion; import org.apache.aurora.scheduler.mesos.TestExecutorSettings; @@ -183,8 +185,10 @@ public class StatusUpdateBenchmark { bind(Driver.class).toInstance(new FakeDriver()); bind(Scheduler.class).to(MesosSchedulerImpl.class); bind(MesosSchedulerImpl.class).in(Singleton.class); + bind(MesosCallbackHandler.class).to(MesosCallbackHandlerImpl.class); + bind(MesosCallbackHandlerImpl.class).in(Singleton.class); bind(Executor.class) - .annotatedWith(MesosSchedulerImpl.SchedulerExecutor.class) + .annotatedWith(MesosCallbackHandlerImpl.SchedulerExecutor.class) .toInstance(AsyncUtil.singleThreadLoggingScheduledExecutor( "SchedulerImpl-%d", LoggerFactory.getLogger(StatusUpdateBenchmark.class))); http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/main/java/org/apache/aurora/scheduler/app/AppModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java index e2ef9c3..081dff2 100644 --- a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java +++ b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java @@ -35,6 +35,7 @@ import org.apache.aurora.gen.Container; import org.apache.aurora.gen.Container._Fields; import org.apache.aurora.scheduler.SchedulerModule; import org.apache.aurora.scheduler.SchedulerServicesModule; +import org.apache.aurora.scheduler.app.SchedulerMain.DriverKind; import org.apache.aurora.scheduler.async.AsyncModule; import org.apache.aurora.scheduler.configuration.ConfigurationManager.ConfigurationManagerSettings; import org.apache.aurora.scheduler.events.PubsubEventModule; @@ -105,13 +106,15 @@ public class AppModule extends AbstractModule { private static final Arg<Boolean> ALLOW_CONTAINER_VOLUMES = Arg.create(false); private final ConfigurationManagerSettings configurationManagerSettings; + private final DriverKind kind; @VisibleForTesting - public AppModule(ConfigurationManagerSettings configurationManagerSettings) { + public AppModule(ConfigurationManagerSettings configurationManagerSettings, DriverKind kind) { this.configurationManagerSettings = requireNonNull(configurationManagerSettings); + this.kind = kind; } - public AppModule(boolean allowGpuResource) { + public AppModule(boolean allowGpuResource, DriverKind kind) { this(new ConfigurationManagerSettings( ImmutableSet.copyOf(ALLOWED_CONTAINER_TYPES.get()), ENABLE_DOCKER_PARAMETERS.get(), @@ -119,7 +122,8 @@ public class AppModule extends AbstractModule { REQUIRE_DOCKER_USE_EXECUTOR.get(), allowGpuResource, ENABLE_MESOS_FETCHER.get(), - ALLOW_CONTAINER_VOLUMES.get())); + ALLOW_CONTAINER_VOLUMES.get()), + kind); } @Override @@ -149,7 +153,7 @@ public class AppModule extends AbstractModule { install(new QuotaModule()); install(new JettyServerModule()); install(new PreemptorModule()); - install(new SchedulerDriverModule()); + install(new SchedulerDriverModule(kind)); install(new SchedulerServicesModule()); install(new SchedulerModule()); install(new StateModule()); http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java index 805e9de..3665c4d 100644 --- a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java +++ b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java @@ -93,6 +93,24 @@ public class SchedulerMain { @CmdLine(name = "allow_gpu_resource", help = "Allow jobs to request Mesos GPU resource.") private static final Arg<Boolean> ALLOW_GPU_RESOURCE = Arg.create(false); + public enum DriverKind { + // TODO(zmanji): Remove this option once V0_DRIVER has been proven out in production. + // This is the original driver that libmesos shipped with. Uses unversioned protobufs, and has + // minimal backwards compatability guarantees. + SCHEDULER_DRIVER, + // These are the new drivers that libmesos ships with. They use versioned (V1) protobufs for + // the Java API. + // V0 Driver offers the V1 API over the old Scheduler Driver. It does not fully support + // the V1 API (ie mesos maintenance). + V0_DRIVER, + // V1 Driver offers the V1 API over a full HTTP API implementation. It allows for maintenance + // primatives and other new features. + V1_DRIVER, + } + + @CmdLine(name = "mesos_driver", help = "Which Mesos Driver to use") + private static final Arg<DriverKind> DRIVER_IMPL = Arg.create(DriverKind.SCHEDULER_DRIVER); + @Inject private SingletonService schedulerService; @Inject private HttpService httpService; @Inject private SchedulerLifecycle schedulerLifecycle; @@ -141,7 +159,7 @@ public class SchedulerMain { return Modules.combine( new LifecycleModule(), new StatsModule(), - new AppModule(ALLOW_GPU_RESOURCE.get()), + new AppModule(ALLOW_GPU_RESOURCE.get(), DRIVER_IMPL.get()), new CronModule(), new DbModule.MigrationManagerModule(), DbModule.productionModule(Bindings.annotatedKeyFactory(Storage.Volatile.class)), @@ -203,7 +221,7 @@ public class SchedulerMain { List<Module> modules = ImmutableList.<Module>builder() .add( new CommandLineDriverSettingsModule(ALLOW_GPU_RESOURCE.get()), - new LibMesosLoadingModule(), + new LibMesosLoadingModule(DRIVER_IMPL.get()), new MesosLogStreamModule(FlaggedZooKeeperConfig.create()), new LogStorageModule(), new TierModule(), http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/main/java/org/apache/aurora/scheduler/mesos/LibMesosLoadingModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/LibMesosLoadingModule.java b/src/main/java/org/apache/aurora/scheduler/mesos/LibMesosLoadingModule.java index e1a2359..3e943ff 100644 --- a/src/main/java/org/apache/aurora/scheduler/mesos/LibMesosLoadingModule.java +++ b/src/main/java/org/apache/aurora/scheduler/mesos/LibMesosLoadingModule.java @@ -15,12 +15,39 @@ package org.apache.aurora.scheduler.mesos; import com.google.inject.AbstractModule; +import org.apache.aurora.scheduler.app.SchedulerMain; +import org.apache.mesos.v1.scheduler.V0Mesos; +import org.apache.mesos.v1.scheduler.V1Mesos; + +import static com.google.common.base.Preconditions.checkState; + /** * A module that binds a driver factory which requires the libmesos native libary. */ public class LibMesosLoadingModule extends AbstractModule { + private final SchedulerMain.DriverKind kind; + + public LibMesosLoadingModule(SchedulerMain.DriverKind kind) { + this.kind = kind; + } + @Override protected void configure() { - bind(DriverFactory.class).to(DriverFactoryImpl.class); + switch(kind) { + case SCHEDULER_DRIVER: + bind(DriverFactory.class).to(DriverFactoryImpl.class); + break; + case V0_DRIVER: + bind(VersionedDriverFactory.class).toInstance((scheduler, frameworkInfo, master, creds) + -> new V0Mesos(scheduler, frameworkInfo, master, creds.orNull())); + break; + case V1_DRIVER: + bind(VersionedDriverFactory.class).toInstance((scheduler, frameworkInfo, master, creds) + -> new V1Mesos(scheduler, master, creds.orNull())); + break; + default: + checkState(false, "Unknown driver kind"); + break; + } } } http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java new file mode 100644 index 0000000..801551b --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java @@ -0,0 +1,288 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.mesos; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicLong; +import javax.inject.Inject; +import javax.inject.Qualifier; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Optional; + +import org.apache.aurora.common.application.Lifecycle; +import org.apache.aurora.common.stats.StatsProvider; +import org.apache.aurora.scheduler.HostOffer; +import org.apache.aurora.scheduler.TaskStatusHandler; +import org.apache.aurora.scheduler.base.SchedulerException; +import org.apache.aurora.scheduler.events.EventSink; +import org.apache.aurora.scheduler.events.PubsubEvent; +import org.apache.aurora.scheduler.offers.OfferManager; +import org.apache.aurora.scheduler.storage.AttributeStore; +import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; +import org.apache.mesos.v1.Protos.AgentID; +import org.apache.mesos.v1.Protos.ExecutorID; +import org.apache.mesos.v1.Protos.FrameworkID; +import org.apache.mesos.v1.Protos.MasterInfo; +import org.apache.mesos.v1.Protos.Offer; +import org.apache.mesos.v1.Protos.OfferID; +import org.apache.mesos.v1.Protos.TaskStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; +import static java.util.Objects.requireNonNull; + +import static org.apache.mesos.v1.Protos.TaskStatus.Reason.REASON_RECONCILIATION; + +/** + * Abstracts the logic of handling scheduler events/callbacks from Mesos. + * This interface allows the two different Mesos Scheduler Callback classes + * to share logic and to simplify testing. + */ +public interface MesosCallbackHandler { + void handleRegistration(FrameworkID frameworkId, MasterInfo masterInfo); + void handleReregistration(MasterInfo masterInfo); + void handleOffers(List<Offer> offers); + void handleDisconnection(); + void handleRescind(OfferID offerId); + void handleMessage(ExecutorID executor, AgentID agent); + void handleError(String message); + void handleUpdate(TaskStatus status); + void handleLostAgent(AgentID agentId); + void handleLostExecutor(ExecutorID executorID, AgentID slaveID, int status); + + class MesosCallbackHandlerImpl implements MesosCallbackHandler { + + private final TaskStatusHandler taskStatusHandler; + private final OfferManager offerManager; + private final Storage storage; + private final Lifecycle lifecycle; + private final EventSink eventSink; + private final Executor executor; + private final Logger log; + + private final AtomicLong offersRescinded; + private final AtomicLong slavesLost; + private final AtomicLong reRegisters; + private final AtomicLong offersRecieved; + private final AtomicLong disconnects; + private final AtomicLong executorsLost; + + /** + * Binding annotation for the executor the incoming Mesos message handler uses. + */ + @VisibleForTesting + @Qualifier + @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) + public @interface SchedulerExecutor { } + + /** + * Creates a new handler for callbacks. + * + * @param storage Store to save host attributes into. + * @param lifecycle Application lifecycle manager. + * @param taskStatusHandler Task status update manager. + * @param offerManager Offer manager. + * @param eventSink Pubsub sink to send driver status changes to. + * @param executor Executor for async work + */ + @Inject + public MesosCallbackHandlerImpl( + Storage storage, + Lifecycle lifecycle, + TaskStatusHandler taskStatusHandler, + OfferManager offerManager, + EventSink eventSink, + @SchedulerExecutor Executor executor, + StatsProvider statsProvider) { + + this( + storage, + lifecycle, + taskStatusHandler, + offerManager, + eventSink, + executor, + LoggerFactory.getLogger(MesosCallbackHandlerImpl.class), + statsProvider); + } + + @VisibleForTesting + MesosCallbackHandlerImpl( + Storage storage, + Lifecycle lifecycle, + TaskStatusHandler taskStatusHandler, + OfferManager offerManager, + EventSink eventSink, + Executor executor, + Logger log, + StatsProvider statsProvider) { + + this.storage = requireNonNull(storage); + this.lifecycle = requireNonNull(lifecycle); + this.taskStatusHandler = requireNonNull(taskStatusHandler); + this.offerManager = requireNonNull(offerManager); + this.eventSink = requireNonNull(eventSink); + this.executor = requireNonNull(executor); + this.log = requireNonNull(log); + + this.offersRescinded = statsProvider.makeCounter("offers_rescinded"); + this.slavesLost = statsProvider.makeCounter("slaves_lost"); + this.reRegisters = statsProvider.makeCounter("scheduler_framework_reregisters"); + this.offersRecieved = statsProvider.makeCounter("scheduler_resource_offers"); + this.disconnects = statsProvider.makeCounter("scheduler_framework_disconnects"); + this.executorsLost = statsProvider.makeCounter("scheduler_lost_executors"); + } + + @Override + public void handleRegistration(FrameworkID frameworkId, MasterInfo masterInfo) { + log.info("Registered with ID " + frameworkId + ", master: " + masterInfo); + + storage.write( + (Storage.MutateWork.NoResult.Quiet) storeProvider -> + storeProvider.getSchedulerStore().saveFrameworkId(frameworkId.getValue())); + eventSink.post(new PubsubEvent.DriverRegistered()); + } + + @Override + public void handleReregistration(MasterInfo masterInfo) { + log.info("Framework re-registered with master " + masterInfo); + reRegisters.incrementAndGet(); + } + + @Override + public void handleOffers(List<Offer> offers) { + // Don't invoke the executor or storage lock if the list of offers is empty. + if (offers.isEmpty()) { + return; + } + + executor.execute(() -> { + // TODO(wfarner): Reconsider the requirements here, augment the task scheduler to skip over + // offers when the host attributes cannot be found. (AURORA-137) + storage.write((Storage.MutateWork.NoResult.Quiet) storeProvider -> { + for (Offer offer : offers) { + IHostAttributes attributes = + AttributeStore.Util.mergeOffer(storeProvider.getAttributeStore(), offer); + storeProvider.getAttributeStore().saveHostAttributes(attributes); + log.debug("Received offer: {}", offer); + offersRecieved.incrementAndGet(); + offerManager.addOffer(new HostOffer(offer, attributes)); + } + }); + }); + } + + @Override + public void handleDisconnection() { + log.warn("Framework disconnected."); + this.disconnects.incrementAndGet(); + eventSink.post(new PubsubEvent.DriverDisconnected()); + } + + @Override + public void handleRescind(OfferID offerId) { + log.info("Offer rescinded: " + offerId); + offerManager.cancelOffer(offerId); + offersRescinded.incrementAndGet(); + } + + @Override + public void handleMessage(ExecutorID executorID, AgentID agentID) { + log.warn( + "Ignoring framework message from {} on {}.", + executorID.getValue(), + agentID.getValue()); + } + + @Override + public void handleError(String message) { + log.error("Received error message: " + message); + lifecycle.shutdown(); + } + + private static void logStatusUpdate(Logger logger, TaskStatus status) { + // Periodic task reconciliation runs generate a large amount of no-op messages. + // Suppress logging for reconciliation status updates by default. + boolean debugLevel = status.hasReason() && status.getReason() == REASON_RECONCILIATION; + + StringBuilder message = new StringBuilder("Received status update for task ") + .append(status.getTaskId().getValue()) + .append(" in state ") + .append(status.getState()); + if (status.hasSource()) { + message.append(" from ").append(status.getSource()); + } + if (status.hasReason()) { + message.append(" with ").append(status.getReason()); + } + if (status.hasMessage()) { + message.append(": ").append(status.getMessage()); + } + if (debugLevel) { + logger.debug(message.toString()); + } else { + logger.info(message.toString()); + } + } + + private static final Function<Double, Long> SECONDS_TO_MICROS = + seconds -> (long) (seconds * 1E6); + + @Override + public void handleUpdate(TaskStatus status) { + logStatusUpdate(log, status); + eventSink.post(new PubsubEvent.TaskStatusReceived( + status.getState(), + // Source and Reason are enums. They cannot be null so we we need to use `hasXXX`. + status.hasSource() ? Optional.of(status.getSource()) : Optional.absent(), + status.hasReason() ? Optional.of(status.getReason()) : Optional.absent(), + Optional.fromNullable(status.getTimestamp()).transform(SECONDS_TO_MICROS))); + + try { + // The status handler is responsible for acknowledging the update. + taskStatusHandler.statusUpdate(status); + } catch (SchedulerException e) { + log.error("Status update failed due to scheduler exception: " + e, e); + // We re-throw the exception here to trigger an abort of the driver. + throw e; + } + } + + @Override + public void handleLostAgent(AgentID agentId) { + log.info("Received notification of lost agent: " + agentId); + slavesLost.incrementAndGet(); + } + + @Override + public void handleLostExecutor(ExecutorID executorID, AgentID slaveID, int status) { + // With the current implementation of MESOS-313, Mesos is also reporting clean terminations of + // custom executors via the executorLost callback. + if (status != 0) { + log.warn("Lost executor " + executorID + " on slave " + slaveID + " with status " + status); + executorsLost.incrementAndGet(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java index eb21096..c3a34d2 100644 --- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java @@ -13,37 +13,14 @@ */ package org.apache.aurora.scheduler.mesos; -import java.lang.annotation.Retention; -import java.lang.annotation.Target; import java.util.List; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicLong; - -import javax.inject.Inject; -import javax.inject.Qualifier; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.inject.Inject; import org.apache.aurora.GuiceUtils.AllowUnchecked; -import org.apache.aurora.common.application.Lifecycle; import org.apache.aurora.common.inject.TimedInterceptor.Timed; -import org.apache.aurora.common.stats.StatsProvider; -import org.apache.aurora.scheduler.HostOffer; -import org.apache.aurora.scheduler.TaskStatusHandler; -import org.apache.aurora.scheduler.base.SchedulerException; -import org.apache.aurora.scheduler.events.EventSink; -import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected; -import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered; -import org.apache.aurora.scheduler.events.PubsubEvent.TaskStatusReceived; -import org.apache.aurora.scheduler.offers.OfferManager; -import org.apache.aurora.scheduler.stats.CachedCounters; -import org.apache.aurora.scheduler.storage.AttributeStore; -import org.apache.aurora.scheduler.storage.Storage; -import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; -import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.mesos.Protos.ExecutorID; import org.apache.mesos.Protos.FrameworkID; import org.apache.mesos.Protos.MasterInfo; @@ -53,103 +30,29 @@ import org.apache.mesos.Protos.SlaveID; import org.apache.mesos.Protos.TaskStatus; import org.apache.mesos.Scheduler; import org.apache.mesos.SchedulerDriver; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import static java.lang.annotation.ElementType.FIELD; -import static java.lang.annotation.ElementType.METHOD; -import static java.lang.annotation.ElementType.PARAMETER; -import static java.lang.annotation.RetentionPolicy.RUNTIME; import static java.util.Objects.requireNonNull; -import static org.apache.mesos.Protos.TaskStatus.Reason.REASON_RECONCILIATION; +import static com.google.common.base.Preconditions.checkState; + +import static org.apache.aurora.scheduler.mesos.ProtosConversion.convert; /** - * Location for communication with mesos. + * Implementation of Scheduler callback interfaces for Mesos SchedulerDriver. */ @VisibleForTesting public class MesosSchedulerImpl implements Scheduler { - private final TaskStatusHandler taskStatusHandler; - private final OfferManager offerManager; - private final Storage storage; - private final Lifecycle lifecycle; - private final EventSink eventSink; - private final Executor executor; - private final Logger log; - private final CachedCounters counters; + private final MesosCallbackHandler handler; private volatile boolean isRegistered = false; - private final AtomicLong offersRescinded; - private final AtomicLong slavesLost; - /** - * Binding annotation for the executor the incoming Mesos message handler uses. - */ - @VisibleForTesting - @Qualifier - @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) - public @interface SchedulerExecutor { } - - /** - * Creates a new scheduler. - * - * @param storage Store to save host attributes into. - * @param lifecycle Application lifecycle manager. - * @param taskStatusHandler Task status update manager. - * @param offerManager Offer manager. - * @param eventSink Pubsub sink to send driver status changes to. - * @param executor Executor for async work - */ @Inject - public MesosSchedulerImpl( - Storage storage, - Lifecycle lifecycle, - TaskStatusHandler taskStatusHandler, - OfferManager offerManager, - EventSink eventSink, - @SchedulerExecutor Executor executor, - CachedCounters counters, - StatsProvider statsProvider) { - - this( - storage, - lifecycle, - taskStatusHandler, - offerManager, - eventSink, - executor, - counters, - LoggerFactory.getLogger(MesosSchedulerImpl.class), - statsProvider); - } - - @VisibleForTesting - MesosSchedulerImpl( - Storage storage, - Lifecycle lifecycle, - TaskStatusHandler taskStatusHandler, - OfferManager offerManager, - EventSink eventSink, - Executor executor, - CachedCounters counters, - Logger log, - StatsProvider statsProvider) { - - this.storage = requireNonNull(storage); - this.lifecycle = requireNonNull(lifecycle); - this.taskStatusHandler = requireNonNull(taskStatusHandler); - this.offerManager = requireNonNull(offerManager); - this.eventSink = requireNonNull(eventSink); - this.executor = requireNonNull(executor); - this.counters = requireNonNull(counters); - this.log = requireNonNull(log); - this.offersRescinded = statsProvider.makeCounter("offers_rescinded"); - this.slavesLost = statsProvider.makeCounter("slaves_lost"); + MesosSchedulerImpl(MesosCallbackHandler handler) { + this.handler = requireNonNull(handler); } @Override public void slaveLost(SchedulerDriver schedulerDriver, SlaveID slaveId) { - log.info("Received notification of lost agent: " + slaveId); - slavesLost.incrementAndGet(); + handler.handleLostAgent(convert(slaveId)); } @Override @@ -157,122 +60,48 @@ public class MesosSchedulerImpl implements Scheduler { SchedulerDriver driver, final FrameworkID frameworkId, MasterInfo masterInfo) { - - log.info("Registered with ID " + frameworkId + ", master: " + masterInfo); - - storage.write( - (NoResult.Quiet) storeProvider -> - storeProvider.getSchedulerStore().saveFrameworkId(frameworkId.getValue())); + handler.handleRegistration(convert(frameworkId), convert(masterInfo)); isRegistered = true; - eventSink.post(new DriverRegistered()); } @Override public void disconnected(SchedulerDriver schedulerDriver) { - log.warn("Framework disconnected."); - counters.get("scheduler_framework_disconnects").incrementAndGet(); - eventSink.post(new DriverDisconnected()); + handler.handleDisconnection(); } @Override public void reregistered(SchedulerDriver schedulerDriver, MasterInfo masterInfo) { - log.info("Framework re-registered with master " + masterInfo); - counters.get("scheduler_framework_reregisters").incrementAndGet(); + handler.handleReregistration(convert(masterInfo)); } @Timed("scheduler_resource_offers") @Override public void resourceOffers(SchedulerDriver driver, final List<Offer> offers) { - Preconditions.checkState(isRegistered, "Must be registered before receiving offers."); - - executor.execute(() -> { - // TODO(wfarner): Reconsider the requirements here, augment the task scheduler to skip over - // offers when the host attributes cannot be found. (AURORA-137) - storage.write((NoResult.Quiet) storeProvider -> { - for (Offer offer : offers) { - org.apache.mesos.v1.Protos.Offer o = ProtosConversion.convert(offer); - IHostAttributes attributes = - AttributeStore.Util.mergeOffer(storeProvider.getAttributeStore(), o); - storeProvider.getAttributeStore().saveHostAttributes(attributes); - log.debug("Received offer: {}", offer); - counters.get("scheduler_resource_offers").incrementAndGet(); - offerManager.addOffer(new HostOffer(o, attributes)); - } - }); - }); + checkState(isRegistered, "Must be registered before receiving offers."); + handler.handleOffers(Lists.transform(offers, ProtosConversion::convert)); } @Override public void offerRescinded(SchedulerDriver schedulerDriver, OfferID offerId) { - log.info("Offer rescinded: " + offerId); - offerManager.cancelOffer(ProtosConversion.convert(offerId)); - offersRescinded.incrementAndGet(); - } - - private static void logStatusUpdate(Logger logger, TaskStatus status) { - // Periodic task reconciliation runs generate a large amount of no-op messages. - // Suppress logging for reconciliation status updates by default. - boolean debugLevel = status.hasReason() && status.getReason() == REASON_RECONCILIATION; - - StringBuilder message = new StringBuilder("Received status update for task ") - .append(status.getTaskId().getValue()) - .append(" in state ") - .append(status.getState()); - if (status.hasSource()) { - message.append(" from ").append(status.getSource()); - } - if (status.hasReason()) { - message.append(" with ").append(status.getReason()); - } - if (status.hasMessage()) { - message.append(": ").append(status.getMessage()); - } - if (debugLevel) { - logger.debug(message.toString()); - } else { - logger.info(message.toString()); - } + handler.handleRescind(convert(offerId)); } - private static final Function<Double, Long> SECONDS_TO_MICROS = seconds -> (long) (seconds * 1E6); - @AllowUnchecked @Timed("scheduler_status_update") @Override public void statusUpdate(SchedulerDriver driver, TaskStatus status) { - logStatusUpdate(log, status); - org.apache.mesos.v1.Protos.TaskStatus converted = ProtosConversion.convert(status); - eventSink.post(new TaskStatusReceived( - converted.getState(), - Optional.fromNullable(converted.getSource()), - converted.hasReason() ? Optional.of(converted.getReason()) : Optional.absent(), - Optional.fromNullable(converted.getTimestamp()).transform(SECONDS_TO_MICROS))); - - try { - // The status handler is responsible for acknowledging the update. - taskStatusHandler.statusUpdate(converted); - } catch (SchedulerException e) { - log.error("Status update failed due to scheduler exception: " + e, e); - // We re-throw the exception here to trigger an abort of the driver. - throw e; - } + handler.handleUpdate(convert(status)); } @Override public void error(SchedulerDriver driver, String message) { - log.error("Received error message: " + message); - lifecycle.shutdown(); + handler.handleError(message); } @Override public void executorLost(SchedulerDriver schedulerDriver, ExecutorID executorID, SlaveID slaveID, int status) { - // With the current implementation of MESOS-313, Mesos is also reporting clean terminations of - // custom executors via the executorLost callback. - if (status != 0) { - log.warn("Lost executor " + executorID + " on slave " + slaveID + " with status " + status); - counters.get("scheduler_lost_executors").incrementAndGet(); - } + handler.handleLostExecutor(convert(executorID), convert(slaveID), status); } @Timed("scheduler_framework_message") @@ -282,7 +111,6 @@ public class MesosSchedulerImpl implements Scheduler { ExecutorID executorID, SlaveID slave, byte[] data) { - - log.warn("Ignoring framework message."); + handler.handleMessage(convert(executorID), convert(slave)); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/main/java/org/apache/aurora/scheduler/mesos/ProtosConversion.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/ProtosConversion.java b/src/main/java/org/apache/aurora/scheduler/mesos/ProtosConversion.java index bc9e23b..26112af 100644 --- a/src/main/java/org/apache/aurora/scheduler/mesos/ProtosConversion.java +++ b/src/main/java/org/apache/aurora/scheduler/mesos/ProtosConversion.java @@ -55,16 +55,36 @@ public final class ProtosConversion { return convert(id, Protos.OfferID.newBuilder()); } + public static Protos.FrameworkID convert(org.apache.mesos.Protos.FrameworkID id) { + return convert(id, Protos.FrameworkID.newBuilder()); + } + + public static Protos.MasterInfo convert(org.apache.mesos.Protos.MasterInfo id) { + return convert(id, Protos.MasterInfo.newBuilder()); + } + public static Protos.TaskStatus convert(org.apache.mesos.Protos.TaskStatus s) { return convert(s, Protos.TaskStatus.newBuilder()); } + public static Protos.AgentID convert(org.apache.mesos.Protos.SlaveID s) { + return convert(s, Protos.AgentID.newBuilder()); + } + + public static Protos.ExecutorID convert(org.apache.mesos.Protos.ExecutorID s) { + return convert(s, Protos.ExecutorID.newBuilder()); + } + // Methods to convert from V1 to unversioned. public static org.apache.mesos.Protos.FrameworkID convert(Protos.FrameworkID id) { return convert(id, org.apache.mesos.Protos.FrameworkID.newBuilder()); } + public static org.apache.mesos.Protos.MasterInfo convert(Protos.MasterInfo id) { + return convert(id, org.apache.mesos.Protos.MasterInfo.newBuilder()); + } + public static org.apache.mesos.Protos.TaskID convert(Protos.TaskID id) { return convert(id, org.apache.mesos.Protos.TaskID.newBuilder()); } @@ -96,4 +116,12 @@ public final class ProtosConversion { public static org.apache.mesos.Protos.Offer convert(Protos.Offer f) { return convert(f, org.apache.mesos.Protos.Offer.newBuilder()); } + + public static org.apache.mesos.Protos.ExecutorID convert(Protos.ExecutorID f) { + return convert(f, org.apache.mesos.Protos.ExecutorID.newBuilder()); + } + + public static org.apache.mesos.Protos.SlaveID convert(Protos.AgentID f) { + return convert(f, org.apache.mesos.Protos.SlaveID.newBuilder()); + } } http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java index 5519323..10d4f1b 100644 --- a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java +++ b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java @@ -18,38 +18,58 @@ import java.util.concurrent.Executor; import javax.inject.Singleton; import com.google.inject.AbstractModule; -import com.google.inject.PrivateModule; +import org.apache.aurora.scheduler.app.SchedulerMain; import org.apache.aurora.scheduler.base.AsyncUtil; import org.apache.aurora.scheduler.events.PubsubEventModule; +import org.apache.aurora.scheduler.mesos.MesosCallbackHandler.MesosCallbackHandlerImpl; import org.apache.mesos.Scheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static com.google.common.base.Preconditions.checkState; + /** * A module that creates a {@link Driver} binding. */ public class SchedulerDriverModule extends AbstractModule { private static final Logger LOG = LoggerFactory.getLogger(SchedulerDriverModule.class); + private final SchedulerMain.DriverKind kind; + + public SchedulerDriverModule(SchedulerMain.DriverKind kind) { + this.kind = kind; + } @Override protected void configure() { - install(new PrivateModule() { - @Override - protected void configure() { + bind(Scheduler.class).to(MesosSchedulerImpl.class); + bind(org.apache.mesos.v1.scheduler.Scheduler.class).to(VersionedMesosSchedulerImpl.class); + bind(MesosSchedulerImpl.class).in(Singleton.class); + bind(MesosCallbackHandler.class).to(MesosCallbackHandlerImpl.class); + bind(MesosCallbackHandlerImpl.class).in(Singleton.class); + // TODO(zmanji): Create singleThreadedExecutor (non-scheduled) variant. + bind(Executor.class).annotatedWith(MesosCallbackHandlerImpl.SchedulerExecutor.class) + .toInstance(AsyncUtil.singleThreadLoggingScheduledExecutor("SchedulerImpl-%d", LOG)); + + switch (kind) { + case SCHEDULER_DRIVER: bind(Driver.class).to(SchedulerDriverService.class); bind(SchedulerDriverService.class).in(Singleton.class); - expose(Driver.class); - - bind(Scheduler.class).to(MesosSchedulerImpl.class); - bind(MesosSchedulerImpl.class).in(Singleton.class); - expose(Scheduler.class); - - // TODO(zmanji): Create singleThreadedExecutor (non-scheduled) variant. - bind(Executor.class).annotatedWith(MesosSchedulerImpl.SchedulerExecutor.class) - .toInstance(AsyncUtil.singleThreadLoggingScheduledExecutor("SchedulerImpl-%d", LOG)); - } - }); + break; + case V0_DRIVER: + bind(Driver.class).to(VersionedSchedulerDriverService.class); + bind(VersionedSchedulerDriverService.class).in(Singleton.class); + PubsubEventModule.bindSubscriber(binder(), VersionedSchedulerDriverService.class); + break; + case V1_DRIVER: + bind(Driver.class).to(VersionedSchedulerDriverService.class); + bind(VersionedSchedulerDriverService.class).in(Singleton.class); + PubsubEventModule.bindSubscriber(binder(), VersionedSchedulerDriverService.class); + break; + default: + checkState(false, "Unknown driver kind."); + break; + } PubsubEventModule.bindSubscriber(binder(), TaskStatusStats.class); bind(TaskStatusStats.class).in(Singleton.class); http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/main/java/org/apache/aurora/scheduler/mesos/VersionedDriverFactory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/VersionedDriverFactory.java b/src/main/java/org/apache/aurora/scheduler/mesos/VersionedDriverFactory.java new file mode 100644 index 0000000..8afeec1 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/mesos/VersionedDriverFactory.java @@ -0,0 +1,32 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.mesos; + +import com.google.common.base.Optional; + +import org.apache.mesos.v1.Protos; +import org.apache.mesos.v1.scheduler.Mesos; +import org.apache.mesos.v1.scheduler.Scheduler; + +/** + * A layer over the constructor for {@link org.apache.mesos.v1.scheduler.Mesos}. This is needed + * since {@link org.apache.mesos.v1.scheduler.Mesos} implementations statically load libmesos. + */ +public interface VersionedDriverFactory { + Mesos create( + Scheduler scheduler, + Protos.FrameworkInfo frameworkInfo, + String master, + Optional<Protos.Credential> credentials); +} http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/main/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImpl.java new file mode 100644 index 0000000..84e3f47 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImpl.java @@ -0,0 +1,198 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.mesos; + +import java.util.List; +import javax.inject.Inject; + +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.Lists; + +import org.apache.aurora.common.inject.TimedInterceptor; +import org.apache.aurora.scheduler.stats.CachedCounters; +import org.apache.aurora.scheduler.storage.Storage; +import org.apache.mesos.v1.Protos; +import org.apache.mesos.v1.scheduler.Mesos; +import org.apache.mesos.v1.scheduler.Protos.Call; +import org.apache.mesos.v1.scheduler.Protos.Event; +import org.apache.mesos.v1.scheduler.Scheduler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.util.Objects.requireNonNull; + +import static com.google.common.base.Preconditions.checkState; + +/** + * Implementation of Scheduler callback interfaces for the V1 Driver. + */ +public class VersionedMesosSchedulerImpl implements Scheduler { + private static final Logger LOG = LoggerFactory.getLogger(VersionedMesosSchedulerImpl.class); + + private final CachedCounters counters; + private final MesosCallbackHandler handler; + private final Storage storage; + private final DriverSettings settings; + + private volatile boolean isRegistered = false; + + private static final String EVENT_COUNTER_STAT_PREFIX = "mesos_scheduler_event_"; + // A cache to hold the metric names to prevent us from creating strings for every event + private final LoadingCache<Event.Type, String> eventMetricNameCache = CacheBuilder.newBuilder() + .maximumSize(Event.Type.values().length) + .initialCapacity(Event.Type.values().length) + .build(new CacheLoader<Event.Type, String>() { + @Override + public String load(Event.Type key) throws Exception { + return EVENT_COUNTER_STAT_PREFIX + key.name(); + } + }); + + @Inject + VersionedMesosSchedulerImpl( + MesosCallbackHandler handler, + CachedCounters counters, + Storage storage, + DriverSettings settings) { + this.handler = requireNonNull(handler); + this.counters = requireNonNull(counters); + this.storage = requireNonNull(storage); + this.settings = requireNonNull(settings); + initializeEventMetrics(); + } + + @Override + public void connected(Mesos mesos) { + LOG.info("Connected to Mesos master."); + + Optional<String> frameworkId = storage.read( + storeProvider -> storeProvider.getSchedulerStore().fetchFrameworkId()); + + Protos.FrameworkInfo.Builder frameworkBuilder = settings.getFrameworkInfo().toBuilder(); + + Call.Builder call = Call.newBuilder().setType(Call.Type.SUBSCRIBE); + + if (frameworkId.isPresent()) { + LOG.info("Found persisted framework ID: " + frameworkId); + Protos.FrameworkID id = Protos.FrameworkID.newBuilder().setValue(frameworkId.get()).build(); + frameworkBuilder.setId(id); + call.setFrameworkId(id); + } else { + frameworkBuilder.clearId(); + call.clearFrameworkId(); + LOG.warn("Did not find a persisted framework ID, connecting as a new framework."); + } + + LOG.info("Sending subscribe call"); + mesos.send(call.setSubscribe(Call.Subscribe.newBuilder() + .setFrameworkInfo(frameworkBuilder.build()) + .build()) + .build()); + } + + @Override + public void disconnected(Mesos mesos) { + handler.handleDisconnection(); + } + + private void initializeEventMetrics() { + // For variable named metrics that are keyed on mesos enums, this ensures that we set + // all possible metrics to 0. + for (Event.Type type : Event.Type.values()) { + this.counters.get(eventMetricNameCache.getUnchecked(type)); + } + } + + private void countEventMetrics(Event event) { + this.counters.get(eventMetricNameCache.getUnchecked(event.getType())).incrementAndGet(); + } + + @TimedInterceptor.Timed("scheduler_received") + @Override + public void received(Mesos mesos, Event event) { + countEventMetrics(event); + switch(event.getType()) { + case SUBSCRIBED: + Event.Subscribed subscribed = event.getSubscribed(); + if (isRegistered) { + handler.handleReregistration(subscribed.getMasterInfo()); + } else { + handler.handleRegistration(subscribed.getFrameworkId(), subscribed.getMasterInfo()); + isRegistered = true; + } + break; + + case OFFERS: + checkState(isRegistered, "Must be registered before receiving offers."); + handler.handleOffers(event.getOffers().getOffersList()); + break; + + case RESCIND: + handler.handleRescind(event.getRescind().getOfferId()); + break; + + case INVERSE_OFFERS: + List<Protos.InverseOffer> offers = event.getInverseOffers().getInverseOffersList(); + String ids = Joiner.on(",").join( + Lists.transform(offers, input -> input.getId().getValue())); + LOG.warn("Ignoring inverse offers: {}", ids); + break; + + case RESCIND_INVERSE_OFFER: + Protos.OfferID id = event.getRescindInverseOffer().getInverseOfferId(); + LOG.warn("Ignoring rescinded inverse offer: {}", id); + break; + + case UPDATE: + Protos.TaskStatus status = event.getUpdate().getStatus(); + handler.handleUpdate(status); + break; + + case MESSAGE: + Event.Message m = event.getMessage(); + handler.handleMessage(m.getExecutorId(), m.getAgentId()); + break; + + case ERROR: + handler.handleError(event.getError().getMessage()); + break; + + case FAILURE: + Event.Failure failure = event.getFailure(); + if (failure.hasExecutorId()) { + handler.handleLostExecutor( + failure.getExecutorId(), + failure.getAgentId(), + failure.getStatus()); + } else { + handler.handleLostAgent(failure.getAgentId()); + } + break; + + // TODO(zmanji): handle HEARTBEAT in a graceful manner + // For now it is ok to silently ignore heart beats because the driver wil + // detect disconnections for us. + case HEARTBEAT: + break; + + default: + LOG.warn("Unknown event from Mesos \n{}", event); + break; + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/main/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverService.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverService.java b/src/main/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverService.java new file mode 100644 index 0000000..9f39aeb --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverService.java @@ -0,0 +1,254 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.mesos; + +import java.util.Collection; +import java.util.concurrent.CountDownLatch; + +import com.google.common.base.Optional; +import com.google.common.collect.Collections2; +import com.google.common.eventbus.Subscribe; +import com.google.common.util.concurrent.AbstractIdleService; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.SettableFuture; +import com.google.inject.Inject; + +import org.apache.aurora.common.base.Command; +import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered; +import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; +import org.apache.aurora.scheduler.storage.Storage; +import org.apache.mesos.v1.Protos.Credential; +import org.apache.mesos.v1.Protos.Filters; +import org.apache.mesos.v1.Protos.FrameworkID; +import org.apache.mesos.v1.Protos.FrameworkInfo; +import org.apache.mesos.v1.Protos.Offer.Operation; +import org.apache.mesos.v1.Protos.OfferID; +import org.apache.mesos.v1.Protos.TaskID; +import org.apache.mesos.v1.Protos.TaskStatus; +import org.apache.mesos.v1.scheduler.Mesos; +import org.apache.mesos.v1.scheduler.Protos.Call; +import org.apache.mesos.v1.scheduler.Scheduler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.util.Objects.requireNonNull; + +import static com.google.common.base.Preconditions.checkState; + +/** + * A driver implementation that uses the V1 API drivers from libmesos. + */ +class VersionedSchedulerDriverService extends AbstractIdleService + implements Driver, EventSubscriber { + private static final Logger LOG = LoggerFactory.getLogger(VersionedSchedulerDriverService.class); + + private final Storage storage; + private final DriverSettings driverSettings; + private final Scheduler scheduler; + private final VersionedDriverFactory factory; + private final SettableFuture<Mesos> mesosFuture = SettableFuture.create(); + private final CountDownLatch terminationLatch = new CountDownLatch(1); + private final CountDownLatch registrationLatch = new CountDownLatch(1); + + @Inject + VersionedSchedulerDriverService( + Storage storage, + DriverSettings settings, + Scheduler scheduler, + VersionedDriverFactory factory) { + this.storage = requireNonNull(storage); + this.driverSettings = requireNonNull(settings); + this.scheduler = requireNonNull(scheduler); + this.factory = requireNonNull(factory); + } + + private FrameworkID getFrameworkId() { + String id = storage.read(storeProvider -> + storeProvider.getSchedulerStore().fetchFrameworkId().get()); + return FrameworkID.newBuilder().setValue(id).build(); + } + + @Override + protected void startUp() throws Exception { + Optional<String> frameworkId = storage.read( + storeProvider -> storeProvider.getSchedulerStore().fetchFrameworkId()); + + LOG.info("Connecting to mesos master: " + driverSettings.getMasterUri()); + if (!driverSettings.getCredentials().isPresent()) { + LOG.warn("Connecting to master without authentication!"); + } + + FrameworkInfo.Builder frameworkBuilder = driverSettings.getFrameworkInfo().toBuilder(); + + if (frameworkId.isPresent()) { + LOG.info("Found persisted framework ID: " + frameworkId); + frameworkBuilder.setId(FrameworkID.newBuilder().setValue(frameworkId.get())); + } else { + LOG.warn("Did not find a persisted framework ID, connecting as a new framework."); + } + + Credential credential = driverSettings.getCredentials().orNull(); + Mesos mesos = factory.create( + scheduler, + frameworkBuilder.build(), + driverSettings.getMasterUri(), + Optional.fromNullable(credential)); + + mesosFuture.set(mesos); + } + + @Override + protected void shutDown() throws Exception { + terminationLatch.countDown(); + } + + @Override + public void acceptOffers(OfferID offerId, Collection<Operation> operations, Filters filter) { + whenRegistered(() -> { + LOG.info("Accepting offer {} with ops {}", offerId, operations); + + Futures.getUnchecked(mesosFuture).send( + Call.newBuilder() + .setFrameworkId(getFrameworkId()) + .setType(Call.Type.ACCEPT) + .setAccept( + Call.Accept.newBuilder() + .addOfferIds(offerId) + .addAllOperations(operations) + .setFilters(filter)) + .build()); + }); + + } + + @Override + public void declineOffer(OfferID offerId, Filters filter) { + whenRegistered(() -> { + LOG.info("Declining offer {}", offerId.getValue()); + + Futures.getUnchecked(mesosFuture).send( + Call.newBuilder().setType(Call.Type.DECLINE) + .setFrameworkId(getFrameworkId()) + .setDecline( + Call.Decline.newBuilder() + .setFilters(filter) + .addOfferIds(offerId)) + .build() + ); + }); + } + + @Override + public void killTask(String taskId) { + whenRegistered(() -> { + LOG.info("Killing task {}", taskId); + + Futures.getUnchecked(mesosFuture).send( + Call.newBuilder().setType(Call.Type.KILL) + .setFrameworkId(getFrameworkId()) + .setKill( + Call.Kill.newBuilder() + .setTaskId(TaskID.newBuilder().setValue(taskId))) + .build() + ); + + }); + } + + @Override + public void acknowledgeStatusUpdate(TaskStatus status) { + // The Mesos API says frameworks are only supposed to acknowledge status updates + // with a UUID. The V0Driver accepts them just fine but the V1Driver logs every time + // a status update is given without a uuid. To silence logs, we drop them here. + + whenRegistered(() -> { + if (!status.hasUuid()) { + return; + } + + LOG.info("Acking status update for {} with uuid: {}", + status.getTaskId().getValue(), + status.getUuid()); + + Futures.getUnchecked(mesosFuture).send( + Call.newBuilder().setType(Call.Type.ACKNOWLEDGE) + .setFrameworkId(getFrameworkId()) + .setAcknowledge( + Call.Acknowledge.newBuilder() + .setAgentId(status.getAgentId()) + .setTaskId(status.getTaskId()) + .setUuid(status.getUuid())) + .build() + ); + }); + + } + + @Override + public void reconcileTasks(Collection<TaskStatus> statuses) { + whenRegistered(() -> { + Collection<Call.Reconcile.Task> tasks = Collections2.transform(statuses, taskStatus -> + Call.Reconcile.Task.newBuilder() + .setTaskId(taskStatus.getTaskId()) + .build()); + + Futures.getUnchecked(mesosFuture).send( + Call.newBuilder() + .setType(Call.Type.RECONCILE) + .setFrameworkId(getFrameworkId()) + .setReconcile( + Call.Reconcile.newBuilder() + .addAllTasks(tasks)) + .build() + ); + }); + } + + @Override + public void blockUntilStopped() { + ensureRunning(); + try { + terminationLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public void abort() { + terminationLatch.countDown(); + stopAsync(); + } + + @Subscribe + public void registered(DriverRegistered event) { + registrationLatch.countDown(); + } + + private void whenRegistered(Command c) { + ensureRunning(); + // We need to block until registered because thats when we are guaranteed to have our + // framework id. Without it, we cannot construct any Call objects. + try { + registrationLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + c.execute(); + } + + private void ensureRunning() { + checkState(isRunning(), "Driver is not running."); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java index 0551804..f579975 100644 --- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java +++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java @@ -30,6 +30,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.hash.Hashing; +import com.google.common.net.InetAddresses; import com.google.common.util.concurrent.Atomics; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -116,6 +117,10 @@ public class SchedulerIT extends BaseZooKeeperClientTest { private static final String SERVERSET_PATH = "/fake/service/path"; private static final String STATS_URL_PREFIX = "fake_url"; private static final String FRAMEWORK_ID = "integration_test_framework_id"; + private static final Protos.MasterInfo MASTER = Protos.MasterInfo.newBuilder() + .setId("master-id") + .setIp(InetAddresses.coerceToInteger(InetAddresses.forString("1.2.3.4"))) //NOPMD + .setPort(5050).build(); private static final IHostAttributes HOST_ATTRIBUTES = IHostAttributes.build(new HostAttributes() .setHost("host") .setSlaveId("slave-id") @@ -336,7 +341,7 @@ public class SchedulerIT extends BaseZooKeeperClientTest { scheduler.getValue().registered( driver, Protos.FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build(), - Protos.MasterInfo.getDefaultInstance()); + MASTER); awaitSchedulerReady(); http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java new file mode 100644 index 0000000..80f631e --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java @@ -0,0 +1,430 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.mesos; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.net.InetAddresses; +import com.google.common.util.concurrent.MoreExecutors; + +import org.apache.aurora.common.application.Lifecycle; +import org.apache.aurora.common.base.Command; +import org.apache.aurora.common.testing.easymock.EasyMockTest; +import org.apache.aurora.gen.HostAttributes; +import org.apache.aurora.scheduler.HostOffer; +import org.apache.aurora.scheduler.TaskStatusHandler; +import org.apache.aurora.scheduler.base.Conversions; +import org.apache.aurora.scheduler.base.SchedulerException; +import org.apache.aurora.scheduler.events.EventSink; +import org.apache.aurora.scheduler.events.PubsubEvent; +import org.apache.aurora.scheduler.offers.OfferManager; +import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; +import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; +import org.apache.aurora.scheduler.testing.FakeStatsProvider; +import org.apache.mesos.v1.Protos; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.aurora.gen.MaintenanceMode.DRAINING; +import static org.apache.aurora.gen.MaintenanceMode.NONE; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.junit.Assert.assertEquals; + +public class MesosCallbackHandlerTest extends EasyMockTest { + private static final Protos.AgentID AGENT_ID = + Protos.AgentID.newBuilder().setValue("agent-id").build(); + + private static final String MASTER_ID = "master-id"; + private static final Protos.MasterInfo MASTER = Protos.MasterInfo.newBuilder() + .setId(MASTER_ID) + .setIp(InetAddresses.coerceToInteger(InetAddresses.forString("1.2.3.4"))) //NOPMD + .setPort(5050).build(); + + private static final Protos.ExecutorID EXECUTOR_ID = + Protos.ExecutorID.newBuilder().setValue("executor-id").build(); + + private static final String FRAMEWORK_ID = "framework-id"; + private static final Protos.FrameworkID FRAMEWORK = + Protos.FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build(); + + private static final String AGENT_HOST = "agent-hostname"; + + private static final Protos.OfferID OFFER_ID = + Protos.OfferID.newBuilder().setValue("offer-id").build(); + private static final Protos.Offer OFFER = Protos.Offer.newBuilder() + .setFrameworkId(FRAMEWORK) + .setAgentId(AGENT_ID) + .setHostname(AGENT_HOST) + .setId(OFFER_ID) + .build(); + + private static final HostOffer HOST_OFFER = new HostOffer( + OFFER, + IHostAttributes.build( + new HostAttributes() + .setHost(AGENT_HOST) + .setSlaveId(AGENT_ID.getValue()) + .setMode(NONE) + .setAttributes(ImmutableSet.of()))); + + private static final Protos.AgentID AGENT_ID_2 = + Protos.AgentID.newBuilder().setValue("agent-id2").build(); + private static final String AGENT_HOST_2 = "agent2-hostname"; + private static final Protos.OfferID OFFER_ID_2 = + Protos.OfferID.newBuilder().setValue("offer-id2").build(); + private static final Protos.Offer OFFER_2 = Protos.Offer.newBuilder() + .setFrameworkId(FRAMEWORK) + .setAgentId(AGENT_ID_2) + .setHostname(AGENT_HOST_2) + .setId(OFFER_ID_2) + .build(); + + private static final HostOffer HOST_OFFER_2 = new HostOffer( + OFFER_2, + IHostAttributes.build( + new HostAttributes() + .setHost(AGENT_HOST_2) + .setSlaveId(AGENT_ID_2.getValue()) + .setMode(NONE) + .setAttributes(ImmutableSet.of()))); + + private static final HostOffer DRAINING_HOST_OFFER = new HostOffer( + OFFER, + IHostAttributes.build(new HostAttributes() + .setHost(AGENT_HOST) + .setSlaveId(AGENT_ID.getValue()) + .setMode(DRAINING) + .setAttributes(ImmutableSet.of()))); + + private static final Protos.TaskStatus STATUS_NO_REASON = Protos.TaskStatus.newBuilder() + .setState(Protos.TaskState.TASK_RUNNING) + .setSource(Protos.TaskStatus.Source.SOURCE_AGENT) + .setMessage("message") + .setTimestamp(1D) + .setTaskId(Protos.TaskID.newBuilder().setValue("task-id").build()) + .build(); + + private static final Protos.TaskStatus STATUS = STATUS_NO_REASON + .toBuilder() + // Only testing data plumbing, this field with TASK_RUNNING would not normally happen, + .setReason(Protos.TaskStatus.Reason.REASON_COMMAND_EXECUTOR_FAILED) + .build(); + + private static final Protos.TaskStatus STATUS_RECONCILIATION = STATUS_NO_REASON + .toBuilder() + .setReason(Protos.TaskStatus.Reason.REASON_RECONCILIATION) + .build(); + + private StorageTestUtil storageUtil; + private Command shutdownCommand; + private TaskStatusHandler statusHandler; + private OfferManager offerManager; + private EventSink eventSink; + private FakeStatsProvider statsProvider; + private Logger injectedLog; + + private MesosCallbackHandler handler; + + @Before + public void setUp() { + + storageUtil = new StorageTestUtil(this); + shutdownCommand = createMock(Command.class); + statusHandler = createMock(TaskStatusHandler.class); + offerManager = createMock(OfferManager.class); + eventSink = createMock(EventSink.class); + statsProvider = new FakeStatsProvider(); + + createHandler(false); + } + + private void createHandler(boolean mockLogger) { + if (mockLogger) { + injectedLog = createMock(Logger.class); + } else { + injectedLog = LoggerFactory.getLogger("MesosCallbackHandlerTestLogger"); + } + + handler = new MesosCallbackHandler.MesosCallbackHandlerImpl( + storageUtil.storage, + new Lifecycle(shutdownCommand), // Cannot mock lifecycle + statusHandler, + offerManager, + eventSink, + MoreExecutors.directExecutor(), + injectedLog, + statsProvider); + + } + + @Test + public void testRegistration() { + + storageUtil.expectOperations(); + + storageUtil.schedulerStore.saveFrameworkId(FRAMEWORK_ID); + expectLastCall(); + + eventSink.post(new PubsubEvent.DriverRegistered()); + + control.replay(); + + handler.handleRegistration(FRAMEWORK, MASTER); + } + + @Test + public void testReRegistration() { + control.replay(); + + handler.handleReregistration(MASTER); + assertEquals(1L, statsProvider.getLongValue("scheduler_framework_reregisters")); + } + + @Test + public void testGetEmptyOfferList() { + control.replay(); + + handler.handleOffers(ImmutableList.of()); + } + + private void expectOfferAttributesSaved(HostOffer offer) { + expect(storageUtil.attributeStore.getHostAttributes(offer.getOffer().getHostname())) + .andReturn(Optional.absent()); + IHostAttributes defaultMode = IHostAttributes.build( + Conversions.getAttributes(offer.getOffer()).newBuilder().setMode(NONE)); + expect(storageUtil.attributeStore.saveHostAttributes(defaultMode)).andReturn(true); + } + + @Test + public void testOffers() { + storageUtil.expectOperations(); + expectOfferAttributesSaved(HOST_OFFER); + offerManager.addOffer(HOST_OFFER); + + control.replay(); + + handler.handleOffers(ImmutableList.of(HOST_OFFER.getOffer())); + assertEquals(1L, statsProvider.getLongValue("scheduler_resource_offers")); + } + + @Test + public void testMultipleOffers() { + storageUtil.expectOperations(); + expectOfferAttributesSaved(HOST_OFFER); + expectOfferAttributesSaved(HOST_OFFER_2); + offerManager.addOffer(HOST_OFFER); + offerManager.addOffer(HOST_OFFER_2); + + control.replay(); + + handler.handleOffers(ImmutableList.of(HOST_OFFER.getOffer(), HOST_OFFER_2.getOffer())); + assertEquals(2L, statsProvider.getLongValue("scheduler_resource_offers")); + } + + @Test + public void testModePreservedWhenOfferAdded() { + storageUtil.expectOperations(); + + IHostAttributes draining = + IHostAttributes.build(HOST_OFFER.getAttributes().newBuilder().setMode(DRAINING)); + expect(storageUtil.attributeStore.getHostAttributes(AGENT_HOST)) + .andReturn(Optional.of(draining)); + + IHostAttributes saved = IHostAttributes.build( + Conversions.getAttributes(HOST_OFFER.getOffer()).newBuilder().setMode(DRAINING)); + + expect(storageUtil.attributeStore.saveHostAttributes(saved)).andReturn(true); + + // If the host is in draining, then the offer manager should get an offer with that attribute + offerManager.addOffer(DRAINING_HOST_OFFER); + + control.replay(); + handler.handleOffers(ImmutableList.of(HOST_OFFER.getOffer())); + assertEquals(1L, statsProvider.getLongValue("scheduler_resource_offers")); + + } + + @Test + public void testDisconnection() { + eventSink.post(new PubsubEvent.DriverDisconnected()); + + control.replay(); + + handler.handleDisconnection(); + assertEquals(1L, statsProvider.getLongValue("scheduler_framework_disconnects")); + } + + @Test + public void testRescind() { + offerManager.cancelOffer(OFFER_ID); + + control.replay(); + + handler.handleRescind(OFFER_ID); + assertEquals(1L, statsProvider.getLongValue("offers_rescinded")); + } + + @Test + public void testError() { + shutdownCommand.execute(); + expectLastCall(); + + control.replay(); + + handler.handleError("Something bad happened!"); + } + + @Test + public void testUpdate() { + eventSink.post(new PubsubEvent.TaskStatusReceived( + STATUS.getState(), + Optional.fromNullable(STATUS.getSource()), + Optional.fromNullable(STATUS.getReason()), + Optional.of(1000000L) + )); + statusHandler.statusUpdate(STATUS); + + control.replay(); + + handler.handleUpdate(STATUS); + } + + @Test + public void testUpdateNoSource() { + Protos.TaskStatus status = STATUS.toBuilder().clearSource().build(); + + eventSink.post(new PubsubEvent.TaskStatusReceived( + status.getState(), + Optional.absent(), + Optional.fromNullable(status.getReason()), + Optional.of(1000000L) + )); + statusHandler.statusUpdate(status); + + control.replay(); + + handler.handleUpdate(status); + } + + @Test + public void testUpdateNoReason() { + Protos.TaskStatus status = STATUS.toBuilder().clearReason().build(); + + eventSink.post(new PubsubEvent.TaskStatusReceived( + status.getState(), + Optional.fromNullable(status.getSource()), + Optional.absent(), + Optional.of(1000000L) + )); + statusHandler.statusUpdate(status); + + control.replay(); + + handler.handleUpdate(status); + } + + @Test + public void testUpdateNoMessage() { + Protos.TaskStatus status = STATUS.toBuilder().clearMessage().build(); + + eventSink.post(new PubsubEvent.TaskStatusReceived( + status.getState(), + Optional.fromNullable(status.getSource()), + Optional.fromNullable(status.getReason()), + Optional.of(1000000L) + )); + statusHandler.statusUpdate(status); + + control.replay(); + + handler.handleUpdate(status); + } + + @Test(expected = SchedulerException.class) + public void testUpdateWithException() { + eventSink.post(new PubsubEvent.TaskStatusReceived( + STATUS.getState(), + Optional.fromNullable(STATUS.getSource()), + Optional.fromNullable(STATUS.getReason()), + Optional.of(1000000L) + )); + statusHandler.statusUpdate(STATUS); + expectLastCall().andThrow(new Storage.StorageException("Storage Failure")); + + control.replay(); + + handler.handleUpdate(STATUS); + } + + @Test + public void testReconciliationUpdateLogging() { + // Mock the logger so we can test that it is logged at debug + createHandler(true); + String expectedMsg = "Received status update for task task-id in state TASK_RUNNING from " + + "SOURCE_AGENT with REASON_RECONCILIATION: message"; + + injectedLog.debug(expectedMsg); + expectLastCall().once(); + + eventSink.post(new PubsubEvent.TaskStatusReceived( + STATUS_RECONCILIATION.getState(), + Optional.fromNullable(STATUS_RECONCILIATION.getSource()), + Optional.fromNullable(STATUS_RECONCILIATION.getReason()), + Optional.of(1000000L) + )); + + statusHandler.statusUpdate(STATUS_RECONCILIATION); + + control.replay(); + + handler.handleUpdate(STATUS_RECONCILIATION); + } + + @Test + public void testLostAgent() { + control.replay(); + + handler.handleLostAgent(AGENT_ID); + assertEquals(1L, statsProvider.getLongValue("slaves_lost")); + } + + @Test + public void testLostExecutorIgnoresOkStatus() { + control.replay(); + + handler.handleLostExecutor(EXECUTOR_ID, AGENT_ID, 0); + assertEquals(0L, statsProvider.getLongValue("scheduler_lost_executors")); + } + + @Test + public void testLostExecutor() { + control.replay(); + + handler.handleLostExecutor(EXECUTOR_ID, AGENT_ID, 1); + assertEquals(1L, statsProvider.getLongValue("scheduler_lost_executors")); + } + + @Test + public void testMessage() { + // Framework messages should be ignored. + control.replay(); + + handler.handleMessage(EXECUTOR_ID, AGENT_ID); + } +}
