Repository: aurora Updated Branches: refs/heads/master 33acb899b -> c32f14c75
Support Mesos Maintenance This adds support for Mesos Maintenance per the design doc[1]. Per the design the scheduler gains another parameter, `unavailability_threshold`. With this threshold the scheduler does the following: 1. Accept all inverse offers from Mesos. 2. Drain when accepting an inverse offer if the unavailability starts within the thereshold. 3. Veto any offers with unavailability starting within the threshold. 4. Penalize offers that have unavailablity information For readability and safety the time based code uses the new `java.time` package in Java 8, primarily relying on the `Instant` class. [1]: https://docs.google.com/document/d/1Z7dFAm6I1nrBE9S5WHw0D0LApBumkIbHrk0-ceoD2YI/edit#heading=h.n5tvzjaj9llx Testing Done: e2e tests Bugs closed: AURORA-1904 Reviewed at https://reviews.apache.org/r/57717/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/c32f14c7 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/c32f14c7 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/c32f14c7 Branch: refs/heads/master Commit: c32f14c75e7f8620aedb12d38aee8ca1d2427724 Parents: 33acb89 Author: Zameer Manji <[email protected]> Authored: Thu Mar 23 14:17:40 2017 -0700 Committer: Zameer Manji <[email protected]> Committed: Thu Mar 23 14:17:40 2017 -0700 ---------------------------------------------------------------------- .../org/apache/aurora/common/util/Clock.java | 17 +++- .../aurora/common/util/testing/FakeClock.java | 6 ++ examples/vagrant/upstart/aurora-scheduler.conf | 3 +- .../aurora/benchmark/fakes/FakeDriver.java | 5 ++ .../org/apache/aurora/scheduler/HostOffer.java | 11 +++ .../aurora/scheduler/base/Conversions.java | 17 +++- .../scheduler/filter/SchedulingFilter.java | 19 ++++- .../scheduler/filter/SchedulingFilterImpl.java | 39 ++++++++- .../apache/aurora/scheduler/mesos/Driver.java | 8 ++ .../scheduler/mesos/MesosCallbackHandler.java | 72 ++++++++++++++-- .../scheduler/mesos/SchedulerDriverService.java | 5 ++ .../mesos/VersionedMesosSchedulerImpl.java | 8 +- .../mesos/VersionedSchedulerDriverService.java | 18 ++++ .../aurora/scheduler/offers/OfferManager.java | 24 ++++-- .../aurora/scheduler/offers/OfferSettings.java | 2 - .../aurora/scheduler/offers/OffersModule.java | 42 +++++++++ .../preemptor/PreemptionVictimFilter.java | 9 +- .../scheduler/state/MaintenanceController.java | 76 +++++++++++++---- .../aurora/scheduler/state/TaskAssigner.java | 5 +- .../filter/SchedulingFilterImplTest.java | 58 ++++++++++++- .../mesos/MesosCallbackHandlerTest.java | 55 +++++++++++- .../VersionedSchedulerDriverServiceTest.java | 27 ++++++ .../scheduler/offers/OfferManagerImplTest.java | 41 ++++++++- .../preemptor/PreemptionVictimFilterTest.java | 21 +++-- .../state/MaintenanceControllerImplTest.java | 40 +++++++++ .../e2e/generate_mesos_maintenance_schedule.py | 43 ++++++++++ .../apache/aurora/e2e/http/http_example.aurora | 3 + .../sh/org/apache/aurora/e2e/test_end_to_end.sh | 89 ++++++++++++++++++++ 28 files changed, 696 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/commons/src/main/java/org/apache/aurora/common/util/Clock.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/Clock.java b/commons/src/main/java/org/apache/aurora/common/util/Clock.java index 5c4ced1..825a8c5 100644 --- a/commons/src/main/java/org/apache/aurora/common/util/Clock.java +++ b/commons/src/main/java/org/apache/aurora/common/util/Clock.java @@ -13,6 +13,8 @@ */ package org.apache.aurora.common.util; +import java.time.Instant; + /** * An abstraction of the system clock. * @author John Sirois @@ -21,7 +23,6 @@ public interface Clock { // TODO(zmanji): Consider replacing this with java.time.Clock /** * A clock that returns the the actual time reported by the system. - * This clock is guaranteed to be serializable. */ Clock SYSTEM_CLOCK = new Clock() { @Override public long nowMillis() { @@ -33,6 +34,9 @@ public interface Clock { @Override public void waitFor(long millis) throws InterruptedException { Thread.sleep(millis); } + @Override public Instant nowInstant() { + return Instant.now(); + } }; /** @@ -44,7 +48,7 @@ public interface Clock { long nowMillis(); /** - * Returns the current time in nanoseconds. Should be used only for relative timing. + * Returns the current time in nanoseconds. Should be used only for relative timing. * See {@code System.nanoTime()} for tips on using the value returned here. * * @return A measure of the current time in nanoseconds. @@ -59,5 +63,12 @@ public interface Clock { * @throws InterruptedException if this wait was interrupted */ void waitFor(long millis) throws InterruptedException; -} + /** + * Returns the current time as an java.time.Instant. + * + * @return the Instant representing the current time. + * @see Instant#now() + */ + Instant nowInstant(); +} http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/commons/src/main/java/org/apache/aurora/common/util/testing/FakeClock.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/testing/FakeClock.java b/commons/src/main/java/org/apache/aurora/common/util/testing/FakeClock.java index 104f2c6..d1eec2b 100644 --- a/commons/src/main/java/org/apache/aurora/common/util/testing/FakeClock.java +++ b/commons/src/main/java/org/apache/aurora/common/util/testing/FakeClock.java @@ -13,6 +13,7 @@ */ package org.apache.aurora.common.util.testing; +import java.time.Instant; import java.util.concurrent.TimeUnit; import com.google.common.base.Preconditions; @@ -76,4 +77,9 @@ public class FakeClock implements Clock { public void waitFor(long millis) { advance(Amount.of(millis, Time.MILLISECONDS)); } + + @Override + public Instant nowInstant() { + return Instant.ofEpochMilli(nowMillis()); + } } http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/examples/vagrant/upstart/aurora-scheduler.conf ---------------------------------------------------------------------- diff --git a/examples/vagrant/upstart/aurora-scheduler.conf b/examples/vagrant/upstart/aurora-scheduler.conf index 31fa036..63fcc87 100644 --- a/examples/vagrant/upstart/aurora-scheduler.conf +++ b/examples/vagrant/upstart/aurora-scheduler.conf @@ -55,4 +55,5 @@ exec bin/aurora-scheduler \ -allow_gpu_resource=true \ -allow_container_volumes=true \ -offer_filter_duration=0secs \ - -mesos_driver=V1_DRIVER + -mesos_driver=V1_DRIVER \ + -unavailability_threshold=1mins http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeDriver.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeDriver.java b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeDriver.java index 45f59c0..2f47a13 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeDriver.java +++ b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeDriver.java @@ -33,6 +33,11 @@ public class FakeDriver extends AbstractIdleService implements Driver { } @Override + public void acceptInverseOffer(Protos.OfferID offerID, Protos.Filters filter) { + // no-op + } + + @Override public void declineOffer(Protos.OfferID offerId, Protos.Filters filters) { // no-op } http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/main/java/org/apache/aurora/scheduler/HostOffer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/HostOffer.java b/src/main/java/org/apache/aurora/scheduler/HostOffer.java index 23f0600..bc40d07 100644 --- a/src/main/java/org/apache/aurora/scheduler/HostOffer.java +++ b/src/main/java/org/apache/aurora/scheduler/HostOffer.java @@ -13,13 +13,16 @@ */ package org.apache.aurora.scheduler; +import java.time.Instant; import java.util.Objects; import com.google.common.base.MoreObjects; +import com.google.common.base.Optional; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import org.apache.aurora.scheduler.base.Conversions; import org.apache.aurora.scheduler.resources.ResourceBag; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; @@ -61,6 +64,14 @@ public class HostOffer { return resourceBagCache.getUnchecked(tierInfo); } + public Optional<Instant> getUnavailabilityStart() { + if (offer.hasUnavailability()) { + return Optional.of(Conversions.getStart(offer.getUnavailability())); + } else { + return Optional.absent(); + } + } + @Override public boolean equals(Object o) { if (!(o instanceof HostOffer)) { http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/main/java/org/apache/aurora/scheduler/base/Conversions.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/base/Conversions.java b/src/main/java/org/apache/aurora/scheduler/base/Conversions.java index 8295216..33cc012 100644 --- a/src/main/java/org/apache/aurora/scheduler/base/Conversions.java +++ b/src/main/java/org/apache/aurora/scheduler/base/Conversions.java @@ -13,9 +13,11 @@ */ package org.apache.aurora.scheduler.base; +import java.time.Instant; import java.util.Collection; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; import com.google.common.base.Function; import com.google.common.base.Preconditions; @@ -33,11 +35,13 @@ import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.mesos.v1.Protos; import org.apache.mesos.v1.Protos.Offer; import org.apache.mesos.v1.Protos.TaskState; +import org.apache.mesos.v1.Protos.Unavailability; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Collection of utility functions to convert mesos protobuf types to internal thrift types. + * Collection of utility functions to convert mesos protobuf types to internal thrift types or + * Java types. */ public final class Conversions { @@ -152,4 +156,15 @@ public final class Conversions { .transform(ATTRIBUTE_NAME) .anyMatch(Predicates.equalTo(ConfigurationManager.DEDICATED_ATTRIBUTE)); } + + /** + * Converts the start of an Unavailability proto to an Instant. + * + * @param unavailability Unavailability information from Mesos. + * @return The java.time.Instant of the start. + */ + public static Instant getStart(Unavailability unavailability) { + long ns = unavailability.getStart().getNanoseconds(); + return Instant.ofEpochMilli(TimeUnit.NANOSECONDS.toMillis(ns)); + } } http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java index bb1a960..36608a9 100644 --- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java +++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java @@ -13,10 +13,13 @@ */ package org.apache.aurora.scheduler.filter; +import java.time.Instant; import java.util.Objects; import java.util.Set; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; +import com.google.common.base.Optional; import org.apache.aurora.scheduler.resources.ResourceBag; import org.apache.aurora.scheduler.storage.entities.IConstraint; @@ -248,10 +251,17 @@ public interface SchedulingFilter { class UnusedResource { private final ResourceBag offer; private final IHostAttributes attributes; + private final Optional<Instant> unavailabilityStart; + @VisibleForTesting public UnusedResource(ResourceBag offer, IHostAttributes attributes) { + this(offer, attributes, Optional.absent()); + } + + public UnusedResource(ResourceBag offer, IHostAttributes attributes, Optional<Instant> start) { this.offer = offer; this.attributes = attributes; + this.unavailabilityStart = start; } public ResourceBag getResourceBag() { @@ -262,6 +272,10 @@ public interface SchedulingFilter { return attributes; } + public Optional<Instant> getUnavailabilityStart() { + return unavailabilityStart; + } + @Override public boolean equals(Object o) { if (!(o instanceof UnusedResource)) { @@ -270,12 +284,13 @@ public interface SchedulingFilter { UnusedResource other = (UnusedResource) o; return Objects.equals(offer, other.offer) - && Objects.equals(attributes, other.attributes); + && Objects.equals(attributes, other.attributes) + && Objects.equals(unavailabilityStart, other.unavailabilityStart); } @Override public int hashCode() { - return Objects.hash(offer, attributes); + return Objects.hash(offer, attributes, unavailabilityStart); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java index 60097d9..df51d4c 100644 --- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java @@ -13,9 +13,11 @@ */ package org.apache.aurora.scheduler.filter; +import java.time.Instant; import java.util.Comparator; import java.util.EnumSet; import java.util.Set; +import javax.inject.Inject; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; @@ -24,15 +26,21 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Ordering; import org.apache.aurora.common.inject.TimedInterceptor.Timed; +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.common.util.Clock; import org.apache.aurora.gen.MaintenanceMode; import org.apache.aurora.gen.TaskConstraint; import org.apache.aurora.scheduler.configuration.ConfigurationManager; +import org.apache.aurora.scheduler.offers.OffersModule.UnavailabilityThreshold; import org.apache.aurora.scheduler.resources.ResourceBag; import org.apache.aurora.scheduler.resources.ResourceType; import org.apache.aurora.scheduler.storage.entities.IAttribute; import org.apache.aurora.scheduler.storage.entities.IConstraint; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; +import static java.util.Objects.requireNonNull; + import static org.apache.aurora.gen.MaintenanceMode.DRAINED; import static org.apache.aurora.gen.MaintenanceMode.DRAINING; import static org.apache.aurora.scheduler.configuration.ConfigurationManager.DEDICATED_ATTRIBUTE; @@ -42,6 +50,15 @@ import static org.apache.aurora.scheduler.configuration.ConfigurationManager.DED * fulfilled, and that tasks are allowed to run on the given machine. */ public class SchedulingFilterImpl implements SchedulingFilter { + private final Amount<Long, Time> unavailabilityThreshold; + private final Clock clock; + + @Inject + public SchedulingFilterImpl(@UnavailabilityThreshold Amount<Long, Time> threshold, Clock clock) { + this.unavailabilityThreshold = requireNonNull(threshold); + this.clock = requireNonNull(clock); + } + private static final Set<MaintenanceMode> VETO_MODES = EnumSet.of(DRAINING, DRAINED); @VisibleForTesting @@ -103,12 +120,24 @@ public class SchedulingFilterImpl implements SchedulingFilter { return Optional.absent(); } - private Optional<Veto> getMaintenanceVeto(MaintenanceMode mode) { + private Optional<Veto> getAuroraMaintenanceVeto(MaintenanceMode mode) { return VETO_MODES.contains(mode) ? Optional.of(Veto.maintenance(mode.toString().toLowerCase())) : Optional.absent(); } + private Optional<Veto> getMesosMaintenanceVeto(Optional<Instant> unavailabilityStart) { + if (unavailabilityStart.isPresent()) { + Instant start = unavailabilityStart.get(); + Instant drainTime = start.minusMillis(unavailabilityThreshold.as(Time.MILLISECONDS)); + + if (clock.nowInstant().isAfter(drainTime)) { + return Optional.of(Veto.maintenance(DRAINING.toString().toLowerCase())); + } + } + return Optional.absent(); + } + private boolean isDedicated(IHostAttributes attributes) { return Iterables.any( attributes.getAttributes(), @@ -130,11 +159,17 @@ public class SchedulingFilterImpl implements SchedulingFilter { } // 2. Host maintenance check. - Optional<Veto> maintenanceVeto = getMaintenanceVeto(resource.getAttributes().getMode()); + Optional<Veto> maintenanceVeto = getAuroraMaintenanceVeto(resource.getAttributes().getMode()); if (maintenanceVeto.isPresent()) { return maintenanceVeto.asSet(); } + Optional<Veto> mesosMaintenanceVeto = + getMesosMaintenanceVeto(resource.getUnavailabilityStart()); + if (mesosMaintenanceVeto.isPresent()) { + return mesosMaintenanceVeto.asSet(); + } + // 3. Value and limit constraint check. Optional<Veto> constraintVeto = getConstraintVeto( request.getConstraints(), http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java b/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java index 71547ce..be5dd45 100644 --- a/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java +++ b/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java @@ -42,6 +42,14 @@ public interface Driver extends Service { void acceptOffers(OfferID offerId, Collection<Operation> operations, Filters filter); /** + * Accepts an inverse offer. + * + * @param offerID ID of the inverse offer. + * @param filter offer filter to apply. + */ + void acceptInverseOffer(OfferID offerID, Filters filter); + + /** * Declines a resource offer. * * @param offerId ID of the offer to decline. http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java index 801551b..800edfa 100644 --- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java +++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java @@ -15,6 +15,8 @@ package org.apache.aurora.scheduler.mesos; import java.lang.annotation.Retention; import java.lang.annotation.Target; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.List; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; @@ -26,19 +28,27 @@ import com.google.common.base.Function; import com.google.common.base.Optional; import org.apache.aurora.common.application.Lifecycle; +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; import org.apache.aurora.common.stats.StatsProvider; +import org.apache.aurora.common.util.Clock; import org.apache.aurora.scheduler.HostOffer; import org.apache.aurora.scheduler.TaskStatusHandler; +import org.apache.aurora.scheduler.base.Conversions; import org.apache.aurora.scheduler.base.SchedulerException; import org.apache.aurora.scheduler.events.EventSink; import org.apache.aurora.scheduler.events.PubsubEvent; import org.apache.aurora.scheduler.offers.OfferManager; +import org.apache.aurora.scheduler.offers.OffersModule; +import org.apache.aurora.scheduler.state.MaintenanceController; import org.apache.aurora.scheduler.storage.AttributeStore; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.mesos.v1.Protos.AgentID; import org.apache.mesos.v1.Protos.ExecutorID; +import org.apache.mesos.v1.Protos.Filters; import org.apache.mesos.v1.Protos.FrameworkID; +import org.apache.mesos.v1.Protos.InverseOffer; import org.apache.mesos.v1.Protos.MasterInfo; import org.apache.mesos.v1.Protos.Offer; import org.apache.mesos.v1.Protos.OfferID; @@ -70,9 +80,9 @@ public interface MesosCallbackHandler { void handleUpdate(TaskStatus status); void handleLostAgent(AgentID agentId); void handleLostExecutor(ExecutorID executorID, AgentID slaveID, int status); + void handleInverseOffer(List<InverseOffer> offers); class MesosCallbackHandlerImpl implements MesosCallbackHandler { - private final TaskStatusHandler taskStatusHandler; private final OfferManager offerManager; private final Storage storage; @@ -80,11 +90,16 @@ public interface MesosCallbackHandler { private final EventSink eventSink; private final Executor executor; private final Logger log; + private final Driver driver; + private final Clock clock; + private final MaintenanceController maintenanceController; + private final Amount<Long, Time> unavailabilityThreshold; private final AtomicLong offersRescinded; private final AtomicLong slavesLost; private final AtomicLong reRegisters; - private final AtomicLong offersRecieved; + private final AtomicLong offersReceived; + private final AtomicLong inverseOffersReceived; private final AtomicLong disconnects; private final AtomicLong executorsLost; @@ -114,7 +129,11 @@ public interface MesosCallbackHandler { OfferManager offerManager, EventSink eventSink, @SchedulerExecutor Executor executor, - StatsProvider statsProvider) { + StatsProvider statsProvider, + Driver driver, + Clock clock, + MaintenanceController controller, + @OffersModule.UnavailabilityThreshold Amount<Long, Time> unavailabilityThreshold) { this( storage, @@ -124,7 +143,11 @@ public interface MesosCallbackHandler { eventSink, executor, LoggerFactory.getLogger(MesosCallbackHandlerImpl.class), - statsProvider); + statsProvider, + driver, + clock, + controller, + unavailabilityThreshold); } @VisibleForTesting @@ -136,7 +159,11 @@ public interface MesosCallbackHandler { EventSink eventSink, Executor executor, Logger log, - StatsProvider statsProvider) { + StatsProvider statsProvider, + Driver driver, + Clock clock, + MaintenanceController maintenanceController, + Amount<Long, Time> unavailabilityThreshold) { this.storage = requireNonNull(storage); this.lifecycle = requireNonNull(lifecycle); @@ -145,11 +172,16 @@ public interface MesosCallbackHandler { this.eventSink = requireNonNull(eventSink); this.executor = requireNonNull(executor); this.log = requireNonNull(log); + this.driver = requireNonNull(driver); + this.clock = requireNonNull(clock); + this.maintenanceController = requireNonNull(maintenanceController); + this.unavailabilityThreshold = requireNonNull(unavailabilityThreshold); this.offersRescinded = statsProvider.makeCounter("offers_rescinded"); this.slavesLost = statsProvider.makeCounter("slaves_lost"); this.reRegisters = statsProvider.makeCounter("scheduler_framework_reregisters"); - this.offersRecieved = statsProvider.makeCounter("scheduler_resource_offers"); + this.offersReceived = statsProvider.makeCounter("scheduler_resource_offers"); + this.inverseOffersReceived = statsProvider.makeCounter("scheduler_inverse_offers"); this.disconnects = statsProvider.makeCounter("scheduler_framework_disconnects"); this.executorsLost = statsProvider.makeCounter("scheduler_lost_executors"); } @@ -186,7 +218,7 @@ public interface MesosCallbackHandler { AttributeStore.Util.mergeOffer(storeProvider.getAttributeStore(), offer); storeProvider.getAttributeStore().saveHostAttributes(attributes); log.debug("Received offer: {}", offer); - offersRecieved.incrementAndGet(); + offersReceived.incrementAndGet(); offerManager.addOffer(new HostOffer(offer, attributes)); } }); @@ -202,7 +234,7 @@ public interface MesosCallbackHandler { @Override public void handleRescind(OfferID offerId) { - log.info("Offer rescinded: " + offerId); + log.info("Offer rescinded: {}", offerId.getValue()); offerManager.cancelOffer(offerId); offersRescinded.incrementAndGet(); } @@ -284,5 +316,29 @@ public interface MesosCallbackHandler { executorsLost.incrementAndGet(); } } + + @Override + public void handleInverseOffer(List<InverseOffer> offers) { + if (offers.isEmpty()) { + return; + } + + executor.execute(() -> { + for (InverseOffer offer: offers) { + inverseOffersReceived.incrementAndGet(); + log.debug("Received inverse offer: {}", offer); + // Use the default filter for accepting inverse offers. + driver.acceptInverseOffer(offer.getId(), Filters.newBuilder().build()); + + Instant start = Conversions.getStart(offer.getUnavailability()); + Instant drainTime = start + .minus(unavailabilityThreshold.as(Time.MILLISECONDS), ChronoUnit.MILLIS); + + if (clock.nowInstant().isAfter(drainTime)) { + maintenanceController.drainForInverseOffer(offer); + } + } + }); + } } } http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java index 4a604f5..e0221f8 100644 --- a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java +++ b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java @@ -140,6 +140,11 @@ class SchedulerDriverService extends AbstractIdleService implements Driver { } @Override + public void acceptInverseOffer(Protos.OfferID offerID, Protos.Filters filter) { + throw new UnsupportedOperationException("SchedulerDriver does not support inverse offers"); + } + + @Override public void declineOffer(Protos.OfferID offerId, Protos.Filters filter) { ensureRunning(); http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/main/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImpl.java index a519c7d..67d356a 100644 --- a/src/main/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImpl.java @@ -13,15 +13,12 @@ */ package org.apache.aurora.scheduler.mesos; -import java.util.List; import javax.inject.Inject; -import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; -import com.google.common.collect.Lists; import org.apache.aurora.common.inject.TimedInterceptor; import org.apache.aurora.scheduler.stats.CachedCounters; @@ -147,10 +144,7 @@ public class VersionedMesosSchedulerImpl implements Scheduler { break; case INVERSE_OFFERS: - List<Protos.InverseOffer> offers = event.getInverseOffers().getInverseOffersList(); - String ids = Joiner.on(",").join( - Lists.transform(offers, input -> input.getId().getValue())); - LOG.warn("Ignoring inverse offers: {}", ids); + handler.handleInverseOffer(event.getInverseOffers().getInverseOffersList()); break; case RESCIND_INVERSE_OFFER: http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/main/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverService.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverService.java b/src/main/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverService.java index f1326ea..5e86504 100644 --- a/src/main/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverService.java +++ b/src/main/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverService.java @@ -137,6 +137,24 @@ class VersionedSchedulerDriverService extends AbstractIdleService } @Override + public void acceptInverseOffer(OfferID offerID, Filters filter) { + whenRegistered(() -> { + LOG.info("Accepting Inverse Offer {}", offerID.getValue()); + + Futures.getUnchecked(mesosFuture).send( + Call.newBuilder() + .setFrameworkId(getFrameworkId()) + .setType(Call.Type.ACCEPT_INVERSE_OFFERS) + .setAcceptInverseOffers( + Call.AcceptInverseOffers.newBuilder() + .addInverseOfferIds(offerID) + .setFilters(filter) + ) + .build()); + }); + } + + @Override public void declineOffer(OfferID offerId, Filters filter) { whenRegistered(() -> { LOG.info("Declining offer {}", offerId.getValue()); http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java index 8c000cb..78255e6 100644 --- a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java +++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java @@ -13,6 +13,7 @@ */ package org.apache.aurora.scheduler.offers; +import java.time.Instant; import java.util.Comparator; import java.util.Map; import java.util.Set; @@ -22,7 +23,6 @@ import java.util.concurrent.atomic.AtomicLong; import javax.inject.Inject; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.collect.FluentIterable; import com.google.common.collect.HashMultimap; @@ -37,7 +37,6 @@ import com.google.common.eventbus.Subscribe; import org.apache.aurora.common.inject.TimedInterceptor.Timed; import org.apache.aurora.common.quantity.Time; import org.apache.aurora.common.stats.StatsProvider; -import org.apache.aurora.gen.MaintenanceMode; import org.apache.aurora.scheduler.HostOffer; import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; import org.apache.aurora.scheduler.async.DelayExecutor; @@ -272,15 +271,22 @@ public interface OfferManager extends EventSubscriber { * the different indices used and their consistency. */ private static class HostOffers { + private static final Ordering<HostOffer> AURORA_MAINTENANCE_COMPARATOR = + Ordering.explicit(NONE, SCHEDULED, DRAINING, DRAINED) + .onResultOf(offer -> offer.getAttributes().getMode()); + // We should not prefer offers from agents that are scheduled to become unavailable. + // We should also sort the unavailability start to prefer agents that are starting + // maintenance later. + private static final Ordering<HostOffer> MESOS_MAINTENANCE_COMPARATOR = + Ordering + .natural() + .reverse() + .onResultOf(o -> o.getUnavailabilityStart().or(Instant.MAX)); + private static final Comparator<HostOffer> PREFERENCE_COMPARATOR = // Currently, the only preference is based on host maintenance status. - Ordering.explicit(NONE, SCHEDULED, DRAINING, DRAINED) - .onResultOf(new Function<HostOffer, MaintenanceMode>() { - @Override - public MaintenanceMode apply(HostOffer offer) { - return offer.getAttributes().getMode(); - } - }) + AURORA_MAINTENANCE_COMPARATOR + .compound(MESOS_MAINTENANCE_COMPARATOR) .compound(Ordering.arbitrary()); private final Set<HostOffer> offers = new ConcurrentSkipListSet<>(PREFERENCE_COMPARATOR); http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java index e16e36e..adf7f33 100644 --- a/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java +++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java @@ -13,7 +13,6 @@ */ package org.apache.aurora.scheduler.offers; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Supplier; import org.apache.aurora.common.quantity.Amount; @@ -24,7 +23,6 @@ import static java.util.Objects.requireNonNull; /** * Settings required to create an OfferManager. */ -@VisibleForTesting public class OfferSettings { private final Amount<Long, Time> offerFilterDuration; http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java b/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java index 202cae9..317a2d2 100644 --- a/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java +++ b/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java @@ -13,10 +13,14 @@ */ package org.apache.aurora.scheduler.offers; +import java.lang.annotation.Retention; +import java.lang.annotation.Target; +import javax.inject.Qualifier; import javax.inject.Singleton; import com.google.inject.AbstractModule; import com.google.inject.PrivateModule; +import com.google.inject.TypeLiteral; import org.apache.aurora.common.args.Arg; import org.apache.aurora.common.args.CmdLine; @@ -25,11 +29,19 @@ import org.apache.aurora.common.quantity.Amount; import org.apache.aurora.common.quantity.Time; import org.apache.aurora.common.util.Random; import org.apache.aurora.scheduler.events.PubsubEventModule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; /** * Binding module for resource offer management. */ public class OffersModule extends AbstractModule { + private static final Logger LOG = LoggerFactory.getLogger(OffersModule.class); @CmdLine(name = "min_offer_hold_time", help = "Minimum amount of time to hold a resource offer before declining.") @@ -50,8 +62,37 @@ public class OffersModule extends AbstractModule { private static final Arg<Amount<Long, Time>> OFFER_FILTER_DURATION = Arg.create(Amount.of(5L, Time.SECONDS)); + @CmdLine(name = "unavailability_threshold", + help = "Threshold time, when running tasks should be drained from a host, before a host " + + "becomes unavailable. Should be greater than min_offer_hold_time + " + + "offer_hold_jitter_window.") + private static final Arg<Amount<Long, Time>> UNAVAILABILITY_THRESHOLD = + Arg.create(Amount.of(6L, Time.MINUTES)); + + /** + * Binding annotation for the threshold to veto tasks with unavailability. + */ + @Qualifier + @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) + public @interface UnavailabilityThreshold { } + @Override protected void configure() { + long offerHoldTime = OFFER_HOLD_JITTER_WINDOW.get().as(Time.SECONDS) + + MIN_OFFER_HOLD_TIME.get().as(Time.SECONDS); + if (UNAVAILABILITY_THRESHOLD.get().as(Time.SECONDS) < offerHoldTime) { + LOG.warn("unavailability_threshold ({}) is less than the sum of min_offer_hold_time ({}) and" + + "offer_hold_jitter_window ({}). This creates risks of races between launching and" + + "draining", + UNAVAILABILITY_THRESHOLD.get(), + MIN_OFFER_HOLD_TIME.get(), + OFFER_HOLD_JITTER_WINDOW.get()); + } + + bind(new TypeLiteral<Amount<Long, Time>>() { }) + .annotatedWith(UnavailabilityThreshold.class) + .toInstance(UNAVAILABILITY_THRESHOLD.get()); + install(new PrivateModule() { @Override protected void configure() { @@ -65,6 +106,7 @@ public class OffersModule extends AbstractModule { bind(OfferManager.class).to(OfferManager.OfferManagerImpl.class); bind(OfferManager.OfferManagerImpl.class).in(Singleton.class); expose(OfferManager.class); + expose(OfferSettings.class); } }); PubsubEventModule.bindSubscriber(binder(), OfferManager.class); http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java index ba49e7a..5ed578c 100644 --- a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java +++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java @@ -13,6 +13,7 @@ */ package org.apache.aurora.scheduler.preemptor; +import java.time.Instant; import java.util.List; import java.util.Set; @@ -214,8 +215,14 @@ public interface PreemptionVictimFilter { for (PreemptionVictim victim : sortedVictims) { toPreemptTasks.add(victim); totalResource = totalResource.add(victimToResources.apply(victim)); + + Optional<Instant> unavailability = Optional.absent(); + if (offer.isPresent()) { + unavailability = offer.get().getUnavailabilityStart(); + } + Set<Veto> vetoes = schedulingFilter.filter( - new UnusedResource(totalResource, attributes.get()), + new UnusedResource(totalResource, attributes.get(), unavailability), new ResourceRequest( pendingTask, ResourceManager.bagFromResources(pendingTask.getResources()).add(overhead), http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java index 574efc9..1edb252 100644 --- a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java +++ b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java @@ -42,6 +42,7 @@ import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.IHostStatus; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.mesos.v1.Protos; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,7 +56,6 @@ import static org.apache.aurora.gen.MaintenanceMode.DRAINING; * All state-changing functions return their results. Additionally, all state-changing functions * will ignore requests to change state of unknown hosts and subsequently omit these hosts from * return values. - * TODO(wfarner): Convert use of HostStatus in this API to IHostStatus (immutable). */ public interface MaintenanceController { @@ -80,6 +80,14 @@ public interface MaintenanceController { Set<IHostStatus> drain(Set<String> hosts); /** + * Drain tasks defined by the inverse offer. + * This method doesn't set any host attributes. + * + * @param inverseOffer the inverse offer to use. + */ + void drainForInverseOffer(Protos.InverseOffer inverseOffer); + + /** * Fetches the current maintenance mode of {$code host}. * * @param host Host to fetch state for. @@ -120,28 +128,40 @@ public interface MaintenanceController { this.batchWorker = requireNonNull(batchWorker); } + private Set<String> drainTasksOnHost(String host, MutableStoreProvider store) { + Query.Builder query = Query.slaveScoped(host).active(); + Set<String> activeTasks = FluentIterable.from(store.getTaskStore().fetchTasks(query)) + .transform(Tasks::id) + .toSet(); + + if (activeTasks.isEmpty()) { + LOG.info("No tasks to drain on host: {}", host); + // Simple way to avoid the log message if there are no tasks. + return activeTasks; + } else { + LOG.info("Draining tasks: {} on host: {}", activeTasks, host); + for (String taskId : activeTasks) { + stateManager.changeState( + store, + taskId, + Optional.absent(), + ScheduleStatus.DRAINING, + DRAINING_MESSAGE); + } + + return activeTasks; + } + + } + private Set<IHostStatus> watchDrainingTasks(MutableStoreProvider store, Set<String> hosts) { LOG.info("Hosts to drain: " + hosts); Set<String> emptyHosts = Sets.newHashSet(); for (String host : hosts) { + Set<String> drainedTasks = drainTasksOnHost(host, store); // If there are no tasks on the host, immediately transition to DRAINED. - Query.Builder query = Query.slaveScoped(host).active(); - Set<String> activeTasks = FluentIterable.from(store.getTaskStore().fetchTasks(query)) - .transform(Tasks::id) - .toSet(); - if (activeTasks.isEmpty()) { - LOG.info("No tasks to drain for host: " + host); + if (drainedTasks.isEmpty()) { emptyHosts.add(host); - } else { - LOG.info("Draining tasks: {} on host: {}", activeTasks, host); - for (String taskId : activeTasks) { - stateManager.changeState( - store, - taskId, - Optional.absent(), - ScheduleStatus.DRAINING, - DRAINING_MESSAGE); - } } } @@ -195,6 +215,28 @@ public interface MaintenanceController { return storage.write(store -> watchDrainingTasks(store, hosts)); } + private Optional<String> getHostname(Protos.InverseOffer offer) { + if (offer.getUrl().getAddress().hasHostname()) { + return Optional.of(offer.getUrl().getAddress().getHostname()); + } else { + return Optional.absent(); + } + } + + @Override + public void drainForInverseOffer(Protos.InverseOffer offer) { + // TaskStore does not allow for querying by agent id. + Optional<String> hostname = getHostname(offer); + + if (hostname.isPresent()) { + String host = hostname.get(); + storage.write(storeProvider -> drainTasksOnHost(host, storeProvider)); + } else { + LOG.error("Unable to drain tasks on agent {} because " + + "no hostname attached to inverse offer {}.", offer.getAgentId(), offer.getId()); + } + } + private static final Function<IHostAttributes, String> HOST_NAME = IHostAttributes::getHost; http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java index da378e8..a177b30 100644 --- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java +++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java @@ -171,7 +171,10 @@ public interface TaskAssigner { } Set<Veto> vetoes = filter.filter( - new UnusedResource(offer.getResourceBag(tierInfo), offer.getAttributes()), + new UnusedResource( + offer.getResourceBag(tierInfo), + offer.getAttributes(), + offer.getUnavailabilityStart()), resourceRequest); if (vetoes.isEmpty()) { http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java index 1d7f9f4..5915e47 100644 --- a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java @@ -13,15 +13,20 @@ */ package org.apache.aurora.scheduler.filter; +import java.time.Instant; import java.util.Arrays; import java.util.Set; +import com.google.common.base.Optional; import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; import org.apache.aurora.common.testing.easymock.EasyMockTest; +import org.apache.aurora.common.util.testing.FakeClock; import org.apache.aurora.gen.Attribute; import org.apache.aurora.gen.Constraint; import org.apache.aurora.gen.ExecutorConfig; @@ -87,11 +92,13 @@ public class SchedulingFilterImplTest extends EasyMockTest { mesosScalar(DISK_MB, DEFAULT_DISK), mesosRange(PORTS, 80, 81))); + private static final Amount<Long, Time> UNAVAILABILITY_THRESHOLD = Amount.of(2L, Time.MINUTES); + private final FakeClock clock = new FakeClock(); private SchedulingFilter defaultFilter; @Before public void setUp() { - defaultFilter = new SchedulingFilterImpl(); + defaultFilter = new SchedulingFilterImpl(UNAVAILABILITY_THRESHOLD, clock); } @Test @@ -217,6 +224,55 @@ public class SchedulingFilterImplTest extends EasyMockTest { } @Test + public void testDrainingMesosMaintenance() { + // Start the test at minute 8 + clock.advance(Amount.of(8L, Time.MINUTES)); + + // The agent will go down at minute 9 + // this is less than the threshold of two minutes + + Instant start = Instant.ofEpochMilli(Amount.of(9L, Time.MINUTES).as(Time.MILLISECONDS)); + + ITaskConfig task = makeTask(); + UnusedResource unusedResource = new UnusedResource( + DEFAULT_OFFER, + hostAttributes(HOST_A), + Optional.of(start)); + ResourceRequest request = new ResourceRequest(task, bag(task), empty()); + + control.replay(); + + assertEquals( + ImmutableSet.of(Veto.maintenance("draining")), + defaultFilter.filter(unusedResource, request)); + } + + @Test + public void testNotVetoingWithMesosMaintenace() { + // Start the test at minute 8 + clock.advance(Amount.of(8L, Time.MINUTES)); + + // The agent will go down at minute 100 + // this is greater than the threshold of two minutes + + Instant start = Instant.ofEpochMilli(Amount.of(100L, Time.MINUTES).as(Time.MILLISECONDS)); + + ITaskConfig task = makeTask(); + UnusedResource unusedResource = new UnusedResource( + DEFAULT_OFFER, + hostAttributes(HOST_A), + Optional.of(start)); + ResourceRequest request = new ResourceRequest(task, bag(task), empty()); + + control.replay(); + + assertEquals( + ImmutableSet.of(), + defaultFilter.filter(unusedResource, request)); + + } + + @Test public void testHostDrainingForMaintenance() { control.replay(); http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java index 80f631e..52041b5 100644 --- a/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java +++ b/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java @@ -21,7 +21,10 @@ import com.google.common.util.concurrent.MoreExecutors; import org.apache.aurora.common.application.Lifecycle; import org.apache.aurora.common.base.Command; +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; import org.apache.aurora.common.testing.easymock.EasyMockTest; +import org.apache.aurora.common.util.testing.FakeClock; import org.apache.aurora.gen.HostAttributes; import org.apache.aurora.scheduler.HostOffer; import org.apache.aurora.scheduler.TaskStatusHandler; @@ -30,6 +33,7 @@ import org.apache.aurora.scheduler.base.SchedulerException; import org.apache.aurora.scheduler.events.EventSink; import org.apache.aurora.scheduler.events.PubsubEvent; import org.apache.aurora.scheduler.offers.OfferManager; +import org.apache.aurora.scheduler.state.MaintenanceController; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; @@ -131,26 +135,44 @@ public class MesosCallbackHandlerTest extends EasyMockTest { .setReason(Protos.TaskStatus.Reason.REASON_RECONCILIATION) .build(); + private static final Amount<Long, Time> DRAIN_THRESHOLD = Amount.of(2L, Time.MINUTES); + + private static final Protos.InverseOffer INVERSE_OFFER = Protos.InverseOffer.newBuilder() + .setAgentId(AGENT_ID) + .setFrameworkId(FRAMEWORK) + .setId(OFFER_ID) + .setUnavailability(Protos.Unavailability.newBuilder() + .setStart(Protos.TimeInfo.newBuilder() + .setNanoseconds(300000000000L) + )) + .build(); + + private static final Protos.Filters FILTER = Protos.Filters.newBuilder().build(); + private StorageTestUtil storageUtil; private Command shutdownCommand; private TaskStatusHandler statusHandler; private OfferManager offerManager; private EventSink eventSink; private FakeStatsProvider statsProvider; + private Driver driver; private Logger injectedLog; + private FakeClock clock; + private MaintenanceController controller; private MesosCallbackHandler handler; @Before public void setUp() { - storageUtil = new StorageTestUtil(this); shutdownCommand = createMock(Command.class); statusHandler = createMock(TaskStatusHandler.class); offerManager = createMock(OfferManager.class); eventSink = createMock(EventSink.class); statsProvider = new FakeStatsProvider(); - + driver = createMock(Driver.class); + clock = new FakeClock(); + controller = createMock(MaintenanceController.class); createHandler(false); } @@ -169,7 +191,11 @@ public class MesosCallbackHandlerTest extends EasyMockTest { eventSink, MoreExecutors.directExecutor(), injectedLog, - statsProvider); + statsProvider, + driver, + clock, + controller, + DRAIN_THRESHOLD); } @@ -427,4 +453,27 @@ public class MesosCallbackHandlerTest extends EasyMockTest { handler.handleMessage(EXECUTOR_ID, AGENT_ID); } + + @Test + public void testInverseOfferInTheFuture() { + driver.acceptInverseOffer(OFFER_ID, FILTER); + + control.replay(); + + handler.handleInverseOffer(ImmutableList.of(INVERSE_OFFER)); + assertEquals(1L, statsProvider.getLongValue("scheduler_inverse_offers")); + } + + @Test + public void testInverseOfferWithinThreshold() { + clock.advance(Amount.of(4L, Time.MINUTES)); + + driver.acceptInverseOffer(OFFER_ID, FILTER); + controller.drainForInverseOffer(INVERSE_OFFER); + + control.replay(); + + handler.handleInverseOffer(ImmutableList.of(INVERSE_OFFER)); + assertEquals(1L, statsProvider.getLongValue("scheduler_inverse_offers")); + } } http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/test/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverServiceTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverServiceTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverServiceTest.java index a6c28bb..a72bd4d 100644 --- a/src/test/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverServiceTest.java +++ b/src/test/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverServiceTest.java @@ -180,6 +180,33 @@ public class VersionedSchedulerDriverServiceTest extends EasyMockTest { assertEquals(accept.getAccept().getOperationsList(), ImmutableList.of()); } + @Test + public void testAcceptInverseOffer() { + expectStart(); + + expect(storage.schedulerStore.fetchFrameworkId()).andReturn(Optional.of(FRAMEWORK_ID)); + + Capture<Call> acceptCapture = createCapture(); + mesos.send(capture(acceptCapture)); + expectLastCall().once(); + + control.replay(); + driverService.startAsync().awaitRunning(); + driverService.registered(new PubsubEvent.DriverRegistered()); + + driverService.acceptInverseOffer(OFFER_ID, FILTER); + + assertTrue(acceptCapture.hasCaptured()); + Call accept = acceptCapture.getValue(); + assertEquals(accept.getFrameworkId().getValue(), FRAMEWORK_ID); + assertEquals(accept.getType(), Call.Type.ACCEPT_INVERSE_OFFERS); + assertEquals(accept.getAcceptInverseOffers().getFilters(), FILTER); + assertEquals( + accept.getAcceptInverseOffers().getInverseOfferIdsList(), + ImmutableList.of(OFFER_ID) + ); + } + private void expectStart() { storage.expectOperations(); expect(storage.schedulerStore.fetchFrameworkId()).andReturn(Optional.of(FRAMEWORK_ID)); http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java index 49d4e82..d7addc0 100644 --- a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java @@ -41,6 +41,8 @@ import org.apache.mesos.v1.Protos; import org.apache.mesos.v1.Protos.Filters; import org.apache.mesos.v1.Protos.Offer.Operation; import org.apache.mesos.v1.Protos.TaskInfo; +import org.apache.mesos.v1.Protos.TimeInfo; +import org.apache.mesos.v1.Protos.Unavailability; import org.junit.Before; import org.junit.Test; @@ -62,6 +64,7 @@ import static org.junit.Assert.fail; public class OfferManagerImplTest extends EasyMockTest { private static final Amount<Long, Time> RETURN_DELAY = Amount.of(1L, Time.DAYS); + private static final Amount<Long, Time> ONE_HOUR = Amount.of(1L, Time.HOURS); private static final String HOST_A = "HOST_A"; private static final IHostAttributes HOST_ATTRIBUTES_A = IHostAttributes.build(new HostAttributes().setMode(NONE).setHost(HOST_A)); @@ -115,7 +118,35 @@ public class OfferManagerImplTest extends EasyMockTest { } @Test - public void testOffersSorted() throws Exception { + public void testOffersSortedByUnavailability() throws Exception { + clock.advance(Amount.of(1L, Time.HOURS)); + + HostOffer hostOfferB = setUnavailability(OFFER_B, clock.nowMillis()); + Long offerCStartTime = clock.nowMillis() + ONE_HOUR.as(Time.MILLISECONDS); + HostOffer hostOfferC = setUnavailability(OFFER_C, offerCStartTime); + + driver.declineOffer(OFFER_B.getOffer().getId(), OFFER_FILTER); + driver.declineOffer(OFFER_A.getOffer().getId(), OFFER_FILTER); + driver.declineOffer(OFFER_C.getOffer().getId(), OFFER_FILTER); + + control.replay(); + + offerManager.addOffer(hostOfferB); + offerManager.addOffer(OFFER_A); + offerManager.addOffer(hostOfferC); + + List<HostOffer> actual = ImmutableList.copyOf(offerManager.getOffers()); + + assertEquals( + // hostOfferC has a further away start time, so it should be preferred. + ImmutableList.of(OFFER_A, hostOfferC, hostOfferB), + actual); + + clock.advance(RETURN_DELAY); + } + + @Test + public void testOffersSortedByMaintenance() throws Exception { // Ensures that non-DRAINING offers are preferred - the DRAINING offer would be tried last. HostOffer offerA = setMode(OFFER_A, DRAINING); @@ -337,6 +368,14 @@ public class OfferManagerImplTest extends EasyMockTest { assertEquals(0L, statsProvider.getLongValue(OUTSTANDING_OFFERS)); } + private static HostOffer setUnavailability(HostOffer offer, Long startMs) { + Unavailability unavailability = Unavailability.newBuilder() + .setStart(TimeInfo.newBuilder().setNanoseconds(startMs * 1000L)).build(); + return new HostOffer( + offer.getOffer().toBuilder().setUnavailability(unavailability).build(), + offer.getAttributes()); + } + private static HostOffer setMode(HostOffer offer, MaintenanceMode mode) { return new HostOffer( offer.getOffer(), http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java index 02bfc51..d310f8d 100644 --- a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java +++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java @@ -25,7 +25,9 @@ import com.google.common.collect.Sets; import org.apache.aurora.common.quantity.Amount; import org.apache.aurora.common.quantity.Data; +import org.apache.aurora.common.quantity.Time; import org.apache.aurora.common.testing.easymock.EasyMockTest; +import org.apache.aurora.common.util.testing.FakeClock; import org.apache.aurora.gen.AssignedTask; import org.apache.aurora.gen.Attribute; import org.apache.aurora.gen.ExecutorConfig; @@ -98,12 +100,14 @@ public class PreemptionVictimFilterTest extends EasyMockTest { private static final String HOST_ATTRIBUTE = "host"; private static final String OFFER = "offer"; private static final Optional<HostOffer> NO_OFFER = Optional.absent(); + private static final Amount<Long, Time> UNAVAILABLITY_THRESHOLD = Amount.of(1L, Time.MINUTES); private StorageTestUtil storageUtil; private SchedulingFilter schedulingFilter; private FakeStatsProvider statsProvider; private PreemptorMetrics preemptorMetrics; private TierManager tierManager; + private FakeClock clock; @Before public void setUp() { @@ -112,6 +116,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest { statsProvider = new FakeStatsProvider(); preemptorMetrics = new PreemptorMetrics(new CachedCounters(statsProvider)); tierManager = createMock(TierManager.class); + clock = new FakeClock(); } private Optional<ImmutableSet<PreemptionVictim>> runFilter( @@ -267,7 +272,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest { // Ensures a production task can preempt 2 tasks on the same host. @Test public void testProductionPreemptingManyNonProduction() throws Exception { - schedulingFilter = new SchedulingFilterImpl(); + schedulingFilter = new SchedulingFilterImpl(UNAVAILABLITY_THRESHOLD, clock); ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1"); setResource(a1, CPUS, 1.0); setResource(a1, RAM_MB, 512.0); @@ -295,7 +300,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest { // Ensures we select the minimal number of tasks to preempt @Test public void testMinimalSetPreempted() throws Exception { - schedulingFilter = new SchedulingFilterImpl(); + schedulingFilter = new SchedulingFilterImpl(UNAVAILABLITY_THRESHOLD, clock); ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1"); setResource(a1, CPUS, 4.0); setResource(a1, RAM_MB, 4096.0); @@ -330,7 +335,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest { // Ensures a production task *never* preempts a production task from another job. @Test public void testProductionJobNeverPreemptsProductionJob() throws Exception { - schedulingFilter = new SchedulingFilterImpl(); + schedulingFilter = new SchedulingFilterImpl(UNAVAILABLITY_THRESHOLD, clock); ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1"); p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024); expectGetTier(p1, PREFERRED_TIER); @@ -350,7 +355,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest { // Ensures that we can preempt if a task + offer can satisfy a pending task. @Test public void testPreemptWithOfferAndTask() throws Exception { - schedulingFilter = new SchedulingFilterImpl(); + schedulingFilter = new SchedulingFilterImpl(UNAVAILABLITY_THRESHOLD, clock); setUpHost(); @@ -375,7 +380,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest { // Ensures revocable offer resources are filtered out. @Test public void testRevocableOfferFiltered() throws Exception { - schedulingFilter = new SchedulingFilterImpl(); + schedulingFilter = new SchedulingFilterImpl(UNAVAILABLITY_THRESHOLD, clock); setUpHost(); @@ -400,7 +405,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest { // Ensures revocable task CPU is not considered for preemption. @Test public void testRevocableVictimsFiltered() throws Exception { - schedulingFilter = new SchedulingFilterImpl(); + schedulingFilter = new SchedulingFilterImpl(UNAVAILABLITY_THRESHOLD, clock); setUpHost(); @@ -425,7 +430,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest { // Ensures revocable victim non-compressible resources are still considered. @Test public void testRevocableVictimRamUsed() throws Exception { - schedulingFilter = new SchedulingFilterImpl(); + schedulingFilter = new SchedulingFilterImpl(UNAVAILABLITY_THRESHOLD, clock); setUpHost(); @@ -452,7 +457,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest { // Ensures we can preempt if two tasks and an offer can satisfy a pending task. @Test public void testPreemptWithOfferAndMultipleTasks() throws Exception { - schedulingFilter = new SchedulingFilterImpl(); + schedulingFilter = new SchedulingFilterImpl(UNAVAILABLITY_THRESHOLD, clock); setUpHost(); http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java index ae83dea..0f1d4d4 100644 --- a/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java @@ -23,6 +23,8 @@ import com.google.inject.AbstractModule; import com.google.inject.Guice; import com.google.inject.Injector; +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; import org.apache.aurora.common.stats.StatsProvider; import org.apache.aurora.common.testing.easymock.EasyMockTest; import org.apache.aurora.gen.HostAttributes; @@ -44,6 +46,7 @@ import org.apache.aurora.scheduler.storage.entities.IHostStatus; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; import org.apache.aurora.scheduler.testing.FakeStatsProvider; +import org.apache.mesos.v1.Protos; import org.junit.Before; import org.junit.Test; @@ -62,6 +65,33 @@ public class MaintenanceControllerImplTest extends EasyMockTest { private static final String HOST_A = "a"; private static final Set<String> A = ImmutableSet.of(HOST_A); + private static final Protos.OfferID OFFER_ID = Protos.OfferID.newBuilder() + .setValue("offer-id") + .build(); + private static final Protos.AgentID AGENT_ID = Protos.AgentID.newBuilder() + .setValue("agent-id") + .build(); + private static final Protos.FrameworkID FRAMEWORK_ID = Protos.FrameworkID.newBuilder() + .setValue("framework-id") + .build(); + private static final Protos.URL AGENT_URL = Protos.URL.newBuilder() + .setAddress(Protos.Address.newBuilder() + .setHostname(HOST_A) + .setPort(5051)) + .setScheme("http") + .build(); + private static final Protos.Unavailability UNAVAILABILITY = Protos.Unavailability.newBuilder() + .setStart(Protos.TimeInfo.newBuilder() + .setNanoseconds(Amount.of(1L, Time.MINUTES).as(Time.NANOSECONDS))) + .build(); + + private static final Protos.InverseOffer INVERSE_OFFER = Protos.InverseOffer.newBuilder() + .setId(OFFER_ID) + .setAgentId(AGENT_ID) + .setUrl(AGENT_URL) + .setFrameworkId(FRAMEWORK_ID) + .setUnavailability(UNAVAILABILITY) + .build(); private StorageTestUtil storageUtil; private StateManager stateManager; @@ -195,6 +225,16 @@ public class MaintenanceControllerImplTest extends EasyMockTest { assertEquals(NONE, maintenance.getMode("unknown")); } + @Test + public void testInverseOfferDrain() { + IScheduledTask task1 = makeTask(HOST_A, "taskA"); + expectFetchTasksByHost(HOST_A, ImmutableSet.of(task1)); + expectTaskDraining(task1); + + control.replay(); + maintenance.drainForInverseOffer(INVERSE_OFFER); + } + private void expectTaskDraining(IScheduledTask task) { expect(stateManager.changeState( storageUtil.mutableStoreProvider, http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/test/sh/org/apache/aurora/e2e/generate_mesos_maintenance_schedule.py ---------------------------------------------------------------------- diff --git a/src/test/sh/org/apache/aurora/e2e/generate_mesos_maintenance_schedule.py b/src/test/sh/org/apache/aurora/e2e/generate_mesos_maintenance_schedule.py new file mode 100644 index 0000000..668ff11 --- /dev/null +++ b/src/test/sh/org/apache/aurora/e2e/generate_mesos_maintenance_schedule.py @@ -0,0 +1,43 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +# This generates a mesos maintnenace schedule for draining the only host in two +# minutes. +import json +import time + +# Get the current time + two minutes and convert it into nanos +start_secs = int(time.time() + 120) +start_ns = start_secs * 10**9 + +machine_id = { + "hostname": "192.168.33.7", + "ip": "192.168.33.7" +} + +unavailability = { + "start": {"nanoseconds": start_ns} + } + +window = { + "machine_ids": [machine_id], + "unavailability": unavailability + } + +schedule = { + "windows": [window] +} + +print json.dumps(schedule) http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/test/sh/org/apache/aurora/e2e/http/http_example.aurora ---------------------------------------------------------------------- diff --git a/src/test/sh/org/apache/aurora/e2e/http/http_example.aurora b/src/test/sh/org/apache/aurora/e2e/http/http_example.aurora index de81792..be43a09 100644 --- a/src/test/sh/org/apache/aurora/e2e/http/http_example.aurora +++ b/src/test/sh/org/apache/aurora/e2e/http/http_example.aurora @@ -134,6 +134,9 @@ jobs = [ name = 'http_example' ).bind(profile=DefaultProfile()), job( + name = 'http_example_maintenance' + ).bind(profile=DefaultProfile()), + job( name = 'http_example_watch_secs', update_config = update_config_watch_secs ).bind(profile=DefaultProfile()), http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh ---------------------------------------------------------------------- diff --git a/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh b/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh index 80b4c54..1a81dc5 100755 --- a/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh +++ b/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh @@ -79,6 +79,57 @@ test_version() { [[ $(aurora --version 2>&1) = $(cat /vagrant/.auroraversion) ]] } +clear_mesos_maintenance() { + curl http://"$TEST_SLAVE_IP":5050/maintenance/schedule \ + -H "Content-type: application/json" \ + -X POST \ + -d "{}" +} + +test_mesos_maintenance() { + local _cluster=$1 _role=$2 _env=$3 + local _base_config=$4 + local _job=$7 + local _jobkey="$_cluster/$_role/$_env/$_job" + + # Clear any previous maintenance schedules before running this test. + clear_mesos_maintenance + + test_create $_jobkey $_base_config + + echo "Waiting job to enter RUNNING..." + wait_until_task_status $_jobkey "0" "RUNNING" + + # Create the maintenance schedule + MAINTENANCE_SCHEDULE="/tmp/maintenance_schedule.json" + python \ + /vagrant/src/test/sh/org/apache/aurora/e2e/generate_mesos_maintenance_schedule.py > "$MAINTENANCE_SCHEDULE" + echo "Creating maintenance with schedule" + cat $MAINTENANCE_SCHEDULE | jq . + + curl http://"$TEST_SLAVE_IP":5050/maintenance/schedule \ + -H "Content-type: application/json" \ + -X POST \ + -d @"$MAINTENANCE_SCHEDULE" + + trap clear_mesos_maintenance EXIT + + # Posting of a maintenance schedule should not cause the task to drain right + # away. + assert_task_status $_jobkey "0" "RUNNING" + + # When it is drain time, it should be killed. + echo "Waiting for time to drain tasks..." + wait_until_task_status $_jobkey "0" "PENDING" + + clear_mesos_maintenance + + echo "Waiting for drained task to re-launch..." + wait_until_task_status $_jobkey "0" "RUNNING" + + test_kill $_jobkey +} + test_health_check() { [[ $(_curl "$TEST_SLAVE_IP:8081/health") == 'OK' ]] } @@ -167,6 +218,39 @@ assert_update_state() { fi } +assert_task_status() { + local _jobkey=$1 _id=$2 _expected_state=$3 + + local _state=$(aurora job status $_jobkey --write-json | jq -r ".[0].active[$_id].status") + + if [[ $_state != $_expected_state ]]; then + echo "Expected task to be in state $_expected_state, but found $_state" + exit 1 + fi +} + +wait_until_task_status() { + # Poll the task, waiting for it to enter the target state + local _jobkey=$1 _id=$2 _expected_state=$3 + local _state="" + local _success=0 + + for i in $(seq 1 120); do + _state=$(aurora job status $_jobkey --write-json | jq -r ".[0].active[$_id].status") + if [[ $_state == $_expected_state ]]; then + _success=1 + break + else + sleep 1 + fi + done + + if [[ "$_success" -ne "1" ]]; then + echo "Task did not transition to $_expected_state within two minutes." + exit 1 + fi +} + test_update() { local _jobkey=$1 _config=$2 _cluster=$3 shift 3 @@ -514,6 +598,7 @@ TEST_CLUSTER=devcluster TEST_ROLE=vagrant TEST_ENV=test TEST_JOB=http_example +TEST_MAINTENANCE_JOB=http_example_maintenance TEST_JOB_WATCH_SECS=http_example_watch_secs TEST_JOB_REVOCABLE=http_example_revocable TEST_JOB_GPU=http_example_gpu @@ -539,6 +624,8 @@ BASE_ARGS=( TEST_JOB_ARGS=("${BASE_ARGS[@]}" "$TEST_JOB") +TEST_MAINTENANCE_JOB_ARGS=("${BASE_ARGS[@]}" "$TEST_MAINTENANCE_JOB") + TEST_JOB_WATCH_SECS_ARGS=("${BASE_ARGS[@]}" "$TEST_JOB_WATCH_SECS") TEST_JOB_REVOCABLE_ARGS=("${BASE_ARGS[@]}" "$TEST_JOB_REVOCABLE") @@ -578,6 +665,8 @@ test_http_example "${TEST_JOB_ARGS[@]}" test_http_example "${TEST_JOB_WATCH_SECS_ARGS[@]}" test_health_check +test_mesos_maintenance "${TEST_MAINTENANCE_JOB_ARGS[@]}" + test_http_example_basic "${TEST_JOB_REVOCABLE_ARGS[@]}" test_http_example_basic "${TEST_JOB_GPU_ARGS[@]}"
