Repository: aurora
Updated Branches:
  refs/heads/master 33acb899b -> c32f14c75


Support Mesos Maintenance

This adds support for Mesos Maintenance per the design doc[1].

Per the design the scheduler gains another parameter,
`unavailability_threshold`. With this threshold the scheduler does the
following:

1. Accept all inverse offers from Mesos.
2. Drain when accepting an inverse offer if the unavailability starts within the
   thereshold.
3. Veto any offers with unavailability starting within the threshold.
4. Penalize offers that have unavailablity information

For readability and safety the time based code uses the new `java.time` package
in Java 8, primarily relying on the `Instant` class.

[1]: 
https://docs.google.com/document/d/1Z7dFAm6I1nrBE9S5WHw0D0LApBumkIbHrk0-ceoD2YI/edit#heading=h.n5tvzjaj9llx

Testing Done:
e2e tests

Bugs closed: AURORA-1904

Reviewed at https://reviews.apache.org/r/57717/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/c32f14c7
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/c32f14c7
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/c32f14c7

Branch: refs/heads/master
Commit: c32f14c75e7f8620aedb12d38aee8ca1d2427724
Parents: 33acb89
Author: Zameer Manji <[email protected]>
Authored: Thu Mar 23 14:17:40 2017 -0700
Committer: Zameer Manji <[email protected]>
Committed: Thu Mar 23 14:17:40 2017 -0700

----------------------------------------------------------------------
 .../org/apache/aurora/common/util/Clock.java    | 17 +++-
 .../aurora/common/util/testing/FakeClock.java   |  6 ++
 examples/vagrant/upstart/aurora-scheduler.conf  |  3 +-
 .../aurora/benchmark/fakes/FakeDriver.java      |  5 ++
 .../org/apache/aurora/scheduler/HostOffer.java  | 11 +++
 .../aurora/scheduler/base/Conversions.java      | 17 +++-
 .../scheduler/filter/SchedulingFilter.java      | 19 ++++-
 .../scheduler/filter/SchedulingFilterImpl.java  | 39 ++++++++-
 .../apache/aurora/scheduler/mesos/Driver.java   |  8 ++
 .../scheduler/mesos/MesosCallbackHandler.java   | 72 ++++++++++++++--
 .../scheduler/mesos/SchedulerDriverService.java |  5 ++
 .../mesos/VersionedMesosSchedulerImpl.java      |  8 +-
 .../mesos/VersionedSchedulerDriverService.java  | 18 ++++
 .../aurora/scheduler/offers/OfferManager.java   | 24 ++++--
 .../aurora/scheduler/offers/OfferSettings.java  |  2 -
 .../aurora/scheduler/offers/OffersModule.java   | 42 +++++++++
 .../preemptor/PreemptionVictimFilter.java       |  9 +-
 .../scheduler/state/MaintenanceController.java  | 76 +++++++++++++----
 .../aurora/scheduler/state/TaskAssigner.java    |  5 +-
 .../filter/SchedulingFilterImplTest.java        | 58 ++++++++++++-
 .../mesos/MesosCallbackHandlerTest.java         | 55 +++++++++++-
 .../VersionedSchedulerDriverServiceTest.java    | 27 ++++++
 .../scheduler/offers/OfferManagerImplTest.java  | 41 ++++++++-
 .../preemptor/PreemptionVictimFilterTest.java   | 21 +++--
 .../state/MaintenanceControllerImplTest.java    | 40 +++++++++
 .../e2e/generate_mesos_maintenance_schedule.py  | 43 ++++++++++
 .../apache/aurora/e2e/http/http_example.aurora  |  3 +
 .../sh/org/apache/aurora/e2e/test_end_to_end.sh | 89 ++++++++++++++++++++
 28 files changed, 696 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/commons/src/main/java/org/apache/aurora/common/util/Clock.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/util/Clock.java 
b/commons/src/main/java/org/apache/aurora/common/util/Clock.java
index 5c4ced1..825a8c5 100644
--- a/commons/src/main/java/org/apache/aurora/common/util/Clock.java
+++ b/commons/src/main/java/org/apache/aurora/common/util/Clock.java
@@ -13,6 +13,8 @@
  */
 package org.apache.aurora.common.util;
 
+import java.time.Instant;
+
 /**
  * An abstraction of the system clock.
  * @author John Sirois
@@ -21,7 +23,6 @@ public interface Clock {
   // TODO(zmanji): Consider replacing this with java.time.Clock
   /**
    * A clock that returns the the actual time reported by the system.
-   * This clock is guaranteed to be serializable.
    */
   Clock SYSTEM_CLOCK = new Clock() {
     @Override public long nowMillis() {
@@ -33,6 +34,9 @@ public interface Clock {
     @Override public void waitFor(long millis) throws InterruptedException {
       Thread.sleep(millis);
     }
+    @Override public Instant nowInstant() {
+      return Instant.now();
+    }
   };
 
   /**
@@ -44,7 +48,7 @@ public interface Clock {
   long nowMillis();
 
   /**
-   * Returns the current time in nanoseconds.  Should be used only for 
relative timing.
+   * Returns the current time in nanoseconds. Should be used only for relative 
timing.
    * See {@code System.nanoTime()} for tips on using the value returned here.
    *
    * @return A measure of the current time in nanoseconds.
@@ -59,5 +63,12 @@ public interface Clock {
    * @throws InterruptedException if this wait was interrupted
    */
   void waitFor(long millis) throws InterruptedException;
-}
 
+  /**
+   * Returns the current time as an java.time.Instant.
+   *
+   * @return the Instant representing the current time.
+   * @see Instant#now()
+   */
+  Instant nowInstant();
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/commons/src/main/java/org/apache/aurora/common/util/testing/FakeClock.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/util/testing/FakeClock.java 
b/commons/src/main/java/org/apache/aurora/common/util/testing/FakeClock.java
index 104f2c6..d1eec2b 100644
--- a/commons/src/main/java/org/apache/aurora/common/util/testing/FakeClock.java
+++ b/commons/src/main/java/org/apache/aurora/common/util/testing/FakeClock.java
@@ -13,6 +13,7 @@
  */
 package org.apache.aurora.common.util.testing;
 
+import java.time.Instant;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Preconditions;
@@ -76,4 +77,9 @@ public class FakeClock implements Clock {
   public void waitFor(long millis) {
     advance(Amount.of(millis, Time.MILLISECONDS));
   }
+
+  @Override
+  public Instant nowInstant() {
+    return Instant.ofEpochMilli(nowMillis());
+  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/examples/vagrant/upstart/aurora-scheduler.conf
----------------------------------------------------------------------
diff --git a/examples/vagrant/upstart/aurora-scheduler.conf 
b/examples/vagrant/upstart/aurora-scheduler.conf
index 31fa036..63fcc87 100644
--- a/examples/vagrant/upstart/aurora-scheduler.conf
+++ b/examples/vagrant/upstart/aurora-scheduler.conf
@@ -55,4 +55,5 @@ exec bin/aurora-scheduler \
   -allow_gpu_resource=true \
   -allow_container_volumes=true \
   -offer_filter_duration=0secs \
-  -mesos_driver=V1_DRIVER
+  -mesos_driver=V1_DRIVER \
+  -unavailability_threshold=1mins

http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeDriver.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeDriver.java 
b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeDriver.java
index 45f59c0..2f47a13 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeDriver.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeDriver.java
@@ -33,6 +33,11 @@ public class FakeDriver extends AbstractIdleService 
implements Driver {
   }
 
   @Override
+  public void acceptInverseOffer(Protos.OfferID offerID, Protos.Filters 
filter) {
+    // no-op
+  }
+
+  @Override
   public void declineOffer(Protos.OfferID offerId, Protos.Filters filters) {
     // no-op
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/main/java/org/apache/aurora/scheduler/HostOffer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/HostOffer.java 
b/src/main/java/org/apache/aurora/scheduler/HostOffer.java
index 23f0600..bc40d07 100644
--- a/src/main/java/org/apache/aurora/scheduler/HostOffer.java
+++ b/src/main/java/org/apache/aurora/scheduler/HostOffer.java
@@ -13,13 +13,16 @@
  */
 package org.apache.aurora.scheduler;
 
+import java.time.Instant;
 import java.util.Objects;
 
 import com.google.common.base.MoreObjects;
+import com.google.common.base.Optional;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 
+import org.apache.aurora.scheduler.base.Conversions;
 import org.apache.aurora.scheduler.resources.ResourceBag;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 
@@ -61,6 +64,14 @@ public class HostOffer {
     return resourceBagCache.getUnchecked(tierInfo);
   }
 
+  public Optional<Instant> getUnavailabilityStart() {
+    if (offer.hasUnavailability()) {
+      return Optional.of(Conversions.getStart(offer.getUnavailability()));
+    } else {
+      return Optional.absent();
+    }
+  }
+
   @Override
   public boolean equals(Object o) {
     if (!(o instanceof HostOffer)) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/main/java/org/apache/aurora/scheduler/base/Conversions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/Conversions.java 
b/src/main/java/org/apache/aurora/scheduler/base/Conversions.java
index 8295216..33cc012 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/Conversions.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/Conversions.java
@@ -13,9 +13,11 @@
  */
 package org.apache.aurora.scheduler.base;
 
+import java.time.Instant;
 import java.util.Collection;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
@@ -33,11 +35,13 @@ import 
org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.mesos.v1.Protos;
 import org.apache.mesos.v1.Protos.Offer;
 import org.apache.mesos.v1.Protos.TaskState;
+import org.apache.mesos.v1.Protos.Unavailability;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Collection of utility functions to convert mesos protobuf types to internal 
thrift types.
+ * Collection of utility functions to convert mesos protobuf types to internal 
thrift types or
+ * Java types.
  */
 public final class Conversions {
 
@@ -152,4 +156,15 @@ public final class Conversions {
         .transform(ATTRIBUTE_NAME)
         
.anyMatch(Predicates.equalTo(ConfigurationManager.DEDICATED_ATTRIBUTE));
   }
+
+  /**
+   * Converts the start of an Unavailability proto to an Instant.
+   *
+   * @param unavailability Unavailability information from Mesos.
+   * @return The java.time.Instant of the start.
+   */
+  public static Instant getStart(Unavailability unavailability) {
+    long ns = unavailability.getStart().getNanoseconds();
+    return Instant.ofEpochMilli(TimeUnit.NANOSECONDS.toMillis(ns));
+  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java 
b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
index bb1a960..36608a9 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
@@ -13,10 +13,13 @@
  */
 package org.apache.aurora.scheduler.filter;
 
+import java.time.Instant;
 import java.util.Objects;
 import java.util.Set;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
+import com.google.common.base.Optional;
 
 import org.apache.aurora.scheduler.resources.ResourceBag;
 import org.apache.aurora.scheduler.storage.entities.IConstraint;
@@ -248,10 +251,17 @@ public interface SchedulingFilter {
   class UnusedResource {
     private final ResourceBag offer;
     private final IHostAttributes attributes;
+    private final Optional<Instant> unavailabilityStart;
 
+    @VisibleForTesting
     public UnusedResource(ResourceBag offer, IHostAttributes attributes) {
+      this(offer, attributes, Optional.absent());
+    }
+
+    public UnusedResource(ResourceBag offer, IHostAttributes attributes, 
Optional<Instant> start) {
       this.offer = offer;
       this.attributes = attributes;
+      this.unavailabilityStart = start;
     }
 
     public ResourceBag getResourceBag() {
@@ -262,6 +272,10 @@ public interface SchedulingFilter {
       return attributes;
     }
 
+    public Optional<Instant> getUnavailabilityStart() {
+      return unavailabilityStart;
+    }
+
     @Override
     public boolean equals(Object o) {
       if (!(o instanceof UnusedResource)) {
@@ -270,12 +284,13 @@ public interface SchedulingFilter {
 
       UnusedResource other = (UnusedResource) o;
       return Objects.equals(offer, other.offer)
-          && Objects.equals(attributes, other.attributes);
+          && Objects.equals(attributes, other.attributes)
+          && Objects.equals(unavailabilityStart, other.unavailabilityStart);
     }
 
     @Override
     public int hashCode() {
-      return Objects.hash(offer, attributes);
+      return Objects.hash(offer, attributes, unavailabilityStart);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java 
b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
index 60097d9..df51d4c 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
@@ -13,9 +13,11 @@
  */
 package org.apache.aurora.scheduler.filter;
 
+import java.time.Instant;
 import java.util.Comparator;
 import java.util.EnumSet;
 import java.util.Set;
+import javax.inject.Inject;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
@@ -24,15 +26,21 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Ordering;
 
 import org.apache.aurora.common.inject.TimedInterceptor.Timed;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.util.Clock;
 import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.TaskConstraint;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager;
+import org.apache.aurora.scheduler.offers.OffersModule.UnavailabilityThreshold;
 import org.apache.aurora.scheduler.resources.ResourceBag;
 import org.apache.aurora.scheduler.resources.ResourceType;
 import org.apache.aurora.scheduler.storage.entities.IAttribute;
 import org.apache.aurora.scheduler.storage.entities.IConstraint;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 
+import static java.util.Objects.requireNonNull;
+
 import static org.apache.aurora.gen.MaintenanceMode.DRAINED;
 import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
 import static 
org.apache.aurora.scheduler.configuration.ConfigurationManager.DEDICATED_ATTRIBUTE;
@@ -42,6 +50,15 @@ import static 
org.apache.aurora.scheduler.configuration.ConfigurationManager.DED
  * fulfilled, and that tasks are allowed to run on the given machine.
  */
 public class SchedulingFilterImpl implements SchedulingFilter {
+  private final Amount<Long, Time> unavailabilityThreshold;
+  private final Clock clock;
+
+  @Inject
+  public SchedulingFilterImpl(@UnavailabilityThreshold Amount<Long, Time> 
threshold, Clock clock) {
+    this.unavailabilityThreshold = requireNonNull(threshold);
+    this.clock = requireNonNull(clock);
+  }
+
   private static final Set<MaintenanceMode> VETO_MODES = EnumSet.of(DRAINING, 
DRAINED);
 
   @VisibleForTesting
@@ -103,12 +120,24 @@ public class SchedulingFilterImpl implements 
SchedulingFilter {
     return Optional.absent();
   }
 
-  private Optional<Veto> getMaintenanceVeto(MaintenanceMode mode) {
+  private Optional<Veto> getAuroraMaintenanceVeto(MaintenanceMode mode) {
     return VETO_MODES.contains(mode)
         ? Optional.of(Veto.maintenance(mode.toString().toLowerCase()))
         : Optional.absent();
   }
 
+  private Optional<Veto> getMesosMaintenanceVeto(Optional<Instant> 
unavailabilityStart) {
+    if (unavailabilityStart.isPresent()) {
+      Instant start = unavailabilityStart.get();
+      Instant drainTime = 
start.minusMillis(unavailabilityThreshold.as(Time.MILLISECONDS));
+
+      if (clock.nowInstant().isAfter(drainTime)) {
+        return 
Optional.of(Veto.maintenance(DRAINING.toString().toLowerCase()));
+      }
+    }
+    return Optional.absent();
+  }
+
   private boolean isDedicated(IHostAttributes attributes) {
     return Iterables.any(
         attributes.getAttributes(),
@@ -130,11 +159,17 @@ public class SchedulingFilterImpl implements 
SchedulingFilter {
     }
 
     // 2. Host maintenance check.
-    Optional<Veto> maintenanceVeto = 
getMaintenanceVeto(resource.getAttributes().getMode());
+    Optional<Veto> maintenanceVeto = 
getAuroraMaintenanceVeto(resource.getAttributes().getMode());
     if (maintenanceVeto.isPresent()) {
       return maintenanceVeto.asSet();
     }
 
+    Optional<Veto> mesosMaintenanceVeto =
+        getMesosMaintenanceVeto(resource.getUnavailabilityStart());
+    if (mesosMaintenanceVeto.isPresent()) {
+      return mesosMaintenanceVeto.asSet();
+    }
+
     // 3. Value and limit constraint check.
     Optional<Veto> constraintVeto = getConstraintVeto(
         request.getConstraints(),

http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java 
b/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java
index 71547ce..be5dd45 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java
@@ -42,6 +42,14 @@ public interface Driver extends Service {
   void acceptOffers(OfferID offerId, Collection<Operation> operations, Filters 
filter);
 
   /**
+   * Accepts an inverse offer.
+   *
+   * @param offerID ID of the inverse offer.
+   * @param filter offer filter to apply.
+   */
+  void acceptInverseOffer(OfferID offerID, Filters filter);
+
+  /**
    * Declines a resource offer.
    *
    * @param offerId ID of the offer to decline.

http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java 
b/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
index 801551b..800edfa 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
@@ -15,6 +15,8 @@ package org.apache.aurora.scheduler.mesos;
 
 import java.lang.annotation.Retention;
 import java.lang.annotation.Target;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
 import java.util.List;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicLong;
@@ -26,19 +28,27 @@ import com.google.common.base.Function;
 import com.google.common.base.Optional;
 
 import org.apache.aurora.common.application.Lifecycle;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.common.stats.StatsProvider;
+import org.apache.aurora.common.util.Clock;
 import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.TaskStatusHandler;
+import org.apache.aurora.scheduler.base.Conversions;
 import org.apache.aurora.scheduler.base.SchedulerException;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.offers.OfferManager;
+import org.apache.aurora.scheduler.offers.OffersModule;
+import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.mesos.v1.Protos.AgentID;
 import org.apache.mesos.v1.Protos.ExecutorID;
+import org.apache.mesos.v1.Protos.Filters;
 import org.apache.mesos.v1.Protos.FrameworkID;
+import org.apache.mesos.v1.Protos.InverseOffer;
 import org.apache.mesos.v1.Protos.MasterInfo;
 import org.apache.mesos.v1.Protos.Offer;
 import org.apache.mesos.v1.Protos.OfferID;
@@ -70,9 +80,9 @@ public interface MesosCallbackHandler {
   void handleUpdate(TaskStatus status);
   void handleLostAgent(AgentID agentId);
   void handleLostExecutor(ExecutorID executorID, AgentID slaveID, int status);
+  void handleInverseOffer(List<InverseOffer> offers);
 
   class MesosCallbackHandlerImpl implements MesosCallbackHandler {
-
     private final TaskStatusHandler taskStatusHandler;
     private final OfferManager offerManager;
     private final Storage storage;
@@ -80,11 +90,16 @@ public interface MesosCallbackHandler {
     private final EventSink eventSink;
     private final Executor executor;
     private final Logger log;
+    private final Driver driver;
+    private final Clock clock;
+    private final MaintenanceController maintenanceController;
+    private final Amount<Long, Time> unavailabilityThreshold;
 
     private final AtomicLong offersRescinded;
     private final AtomicLong slavesLost;
     private final AtomicLong reRegisters;
-    private final AtomicLong offersRecieved;
+    private final AtomicLong offersReceived;
+    private final AtomicLong inverseOffersReceived;
     private final AtomicLong disconnects;
     private final AtomicLong executorsLost;
 
@@ -114,7 +129,11 @@ public interface MesosCallbackHandler {
         OfferManager offerManager,
         EventSink eventSink,
         @SchedulerExecutor Executor executor,
-        StatsProvider statsProvider) {
+        StatsProvider statsProvider,
+        Driver driver,
+        Clock clock,
+        MaintenanceController controller,
+        @OffersModule.UnavailabilityThreshold Amount<Long, Time> 
unavailabilityThreshold) {
 
       this(
           storage,
@@ -124,7 +143,11 @@ public interface MesosCallbackHandler {
           eventSink,
           executor,
           LoggerFactory.getLogger(MesosCallbackHandlerImpl.class),
-          statsProvider);
+          statsProvider,
+          driver,
+          clock,
+          controller,
+          unavailabilityThreshold);
     }
 
     @VisibleForTesting
@@ -136,7 +159,11 @@ public interface MesosCallbackHandler {
         EventSink eventSink,
         Executor executor,
         Logger log,
-        StatsProvider statsProvider) {
+        StatsProvider statsProvider,
+        Driver driver,
+        Clock clock,
+        MaintenanceController maintenanceController,
+        Amount<Long, Time> unavailabilityThreshold) {
 
       this.storage = requireNonNull(storage);
       this.lifecycle = requireNonNull(lifecycle);
@@ -145,11 +172,16 @@ public interface MesosCallbackHandler {
       this.eventSink = requireNonNull(eventSink);
       this.executor = requireNonNull(executor);
       this.log = requireNonNull(log);
+      this.driver = requireNonNull(driver);
+      this.clock = requireNonNull(clock);
+      this.maintenanceController = requireNonNull(maintenanceController);
+      this.unavailabilityThreshold = requireNonNull(unavailabilityThreshold);
 
       this.offersRescinded = statsProvider.makeCounter("offers_rescinded");
       this.slavesLost = statsProvider.makeCounter("slaves_lost");
       this.reRegisters = 
statsProvider.makeCounter("scheduler_framework_reregisters");
-      this.offersRecieved = 
statsProvider.makeCounter("scheduler_resource_offers");
+      this.offersReceived = 
statsProvider.makeCounter("scheduler_resource_offers");
+      this.inverseOffersReceived = 
statsProvider.makeCounter("scheduler_inverse_offers");
       this.disconnects = 
statsProvider.makeCounter("scheduler_framework_disconnects");
       this.executorsLost = 
statsProvider.makeCounter("scheduler_lost_executors");
     }
@@ -186,7 +218,7 @@ public interface MesosCallbackHandler {
                 
AttributeStore.Util.mergeOffer(storeProvider.getAttributeStore(), offer);
             storeProvider.getAttributeStore().saveHostAttributes(attributes);
             log.debug("Received offer: {}", offer);
-            offersRecieved.incrementAndGet();
+            offersReceived.incrementAndGet();
             offerManager.addOffer(new HostOffer(offer, attributes));
           }
         });
@@ -202,7 +234,7 @@ public interface MesosCallbackHandler {
 
     @Override
     public void handleRescind(OfferID offerId) {
-      log.info("Offer rescinded: " + offerId);
+      log.info("Offer rescinded: {}", offerId.getValue());
       offerManager.cancelOffer(offerId);
       offersRescinded.incrementAndGet();
     }
@@ -284,5 +316,29 @@ public interface MesosCallbackHandler {
         executorsLost.incrementAndGet();
       }
     }
+
+    @Override
+    public void handleInverseOffer(List<InverseOffer> offers) {
+      if (offers.isEmpty()) {
+        return;
+      }
+
+      executor.execute(() -> {
+        for (InverseOffer offer: offers) {
+          inverseOffersReceived.incrementAndGet();
+          log.debug("Received inverse offer: {}", offer);
+          // Use the default filter for accepting inverse offers.
+          driver.acceptInverseOffer(offer.getId(), 
Filters.newBuilder().build());
+
+          Instant start = Conversions.getStart(offer.getUnavailability());
+          Instant drainTime = start
+              .minus(unavailabilityThreshold.as(Time.MILLISECONDS), 
ChronoUnit.MILLIS);
+
+          if (clock.nowInstant().isAfter(drainTime)) {
+            maintenanceController.drainForInverseOffer(offer);
+          }
+        }
+      });
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java 
b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java
index 4a604f5..e0221f8 100644
--- 
a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java
+++ 
b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java
@@ -140,6 +140,11 @@ class SchedulerDriverService extends AbstractIdleService 
implements Driver {
   }
 
   @Override
+  public void acceptInverseOffer(Protos.OfferID offerID, Protos.Filters 
filter) {
+    throw new UnsupportedOperationException("SchedulerDriver does not support 
inverse offers");
+  }
+
+  @Override
   public void declineOffer(Protos.OfferID offerId, Protos.Filters filter) {
     ensureRunning();
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/main/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImpl.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImpl.java
 
b/src/main/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImpl.java
index a519c7d..67d356a 100644
--- 
a/src/main/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImpl.java
+++ 
b/src/main/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImpl.java
@@ -13,15 +13,12 @@
  */
 package org.apache.aurora.scheduler.mesos;
 
-import java.util.List;
 import javax.inject.Inject;
 
-import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
-import com.google.common.collect.Lists;
 
 import org.apache.aurora.common.inject.TimedInterceptor;
 import org.apache.aurora.scheduler.stats.CachedCounters;
@@ -147,10 +144,7 @@ public class VersionedMesosSchedulerImpl implements 
Scheduler {
         break;
 
       case INVERSE_OFFERS:
-        List<Protos.InverseOffer> offers = 
event.getInverseOffers().getInverseOffersList();
-        String ids = Joiner.on(",").join(
-            Lists.transform(offers, input -> input.getId().getValue()));
-        LOG.warn("Ignoring inverse offers: {}", ids);
+        
handler.handleInverseOffer(event.getInverseOffers().getInverseOffersList());
         break;
 
       case RESCIND_INVERSE_OFFER:

http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/main/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverService.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverService.java
 
b/src/main/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverService.java
index f1326ea..5e86504 100644
--- 
a/src/main/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverService.java
+++ 
b/src/main/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverService.java
@@ -137,6 +137,24 @@ class VersionedSchedulerDriverService extends 
AbstractIdleService
   }
 
   @Override
+  public void acceptInverseOffer(OfferID offerID, Filters filter) {
+    whenRegistered(() -> {
+      LOG.info("Accepting Inverse Offer {}", offerID.getValue());
+
+      Futures.getUnchecked(mesosFuture).send(
+          Call.newBuilder()
+              .setFrameworkId(getFrameworkId())
+              .setType(Call.Type.ACCEPT_INVERSE_OFFERS)
+              .setAcceptInverseOffers(
+                  Call.AcceptInverseOffers.newBuilder()
+                    .addInverseOfferIds(offerID)
+                    .setFilters(filter)
+              )
+          .build());
+    });
+  }
+
+  @Override
   public void declineOffer(OfferID offerId, Filters filter) {
     whenRegistered(() -> {
       LOG.info("Declining offer {}", offerId.getValue());

http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java 
b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
index 8c000cb..78255e6 100644
--- a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
@@ -13,6 +13,7 @@
  */
 package org.apache.aurora.scheduler.offers;
 
+import java.time.Instant;
 import java.util.Comparator;
 import java.util.Map;
 import java.util.Set;
@@ -22,7 +23,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import javax.inject.Inject;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.HashMultimap;
@@ -37,7 +37,6 @@ import com.google.common.eventbus.Subscribe;
 import org.apache.aurora.common.inject.TimedInterceptor.Timed;
 import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.common.stats.StatsProvider;
-import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
 import org.apache.aurora.scheduler.async.DelayExecutor;
@@ -272,15 +271,22 @@ public interface OfferManager extends EventSubscriber {
      * the different indices used and their consistency.
      */
     private static class HostOffers {
+      private static final Ordering<HostOffer> AURORA_MAINTENANCE_COMPARATOR =
+          Ordering.explicit(NONE, SCHEDULED, DRAINING, DRAINED)
+              .onResultOf(offer -> offer.getAttributes().getMode());
+      // We should not prefer offers from agents that are scheduled to become 
unavailable.
+      // We should also sort the unavailability start to prefer agents that 
are starting
+      // maintenance later.
+      private static final Ordering<HostOffer> MESOS_MAINTENANCE_COMPARATOR =
+          Ordering
+            .natural()
+            .reverse()
+            .onResultOf(o -> o.getUnavailabilityStart().or(Instant.MAX));
+
       private static final Comparator<HostOffer> PREFERENCE_COMPARATOR =
           // Currently, the only preference is based on host maintenance 
status.
-          Ordering.explicit(NONE, SCHEDULED, DRAINING, DRAINED)
-              .onResultOf(new Function<HostOffer, MaintenanceMode>() {
-                @Override
-                public MaintenanceMode apply(HostOffer offer) {
-                  return offer.getAttributes().getMode();
-                }
-              })
+          AURORA_MAINTENANCE_COMPARATOR
+              .compound(MESOS_MAINTENANCE_COMPARATOR)
               .compound(Ordering.arbitrary());
 
       private final Set<HostOffer> offers = new 
ConcurrentSkipListSet<>(PREFERENCE_COMPARATOR);

http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java 
b/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java
index e16e36e..adf7f33 100644
--- a/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java
@@ -13,7 +13,6 @@
  */
 package org.apache.aurora.scheduler.offers;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Supplier;
 
 import org.apache.aurora.common.quantity.Amount;
@@ -24,7 +23,6 @@ import static java.util.Objects.requireNonNull;
 /**
  * Settings required to create an OfferManager.
  */
-@VisibleForTesting
 public class OfferSettings {
 
   private final Amount<Long, Time> offerFilterDuration;

http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java 
b/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java
index 202cae9..317a2d2 100644
--- a/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java
@@ -13,10 +13,14 @@
  */
 package org.apache.aurora.scheduler.offers;
 
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import javax.inject.Qualifier;
 import javax.inject.Singleton;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.PrivateModule;
+import com.google.inject.TypeLiteral;
 
 import org.apache.aurora.common.args.Arg;
 import org.apache.aurora.common.args.CmdLine;
@@ -25,11 +29,19 @@ import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.common.util.Random;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
 
 /**
  * Binding module for resource offer management.
  */
 public class OffersModule extends AbstractModule {
+  private static final Logger LOG = 
LoggerFactory.getLogger(OffersModule.class);
 
   @CmdLine(name = "min_offer_hold_time",
       help = "Minimum amount of time to hold a resource offer before 
declining.")
@@ -50,8 +62,37 @@ public class OffersModule extends AbstractModule {
   private static final Arg<Amount<Long, Time>> OFFER_FILTER_DURATION =
       Arg.create(Amount.of(5L, Time.SECONDS));
 
+  @CmdLine(name = "unavailability_threshold",
+      help = "Threshold time, when running tasks should be drained from a 
host, before a host "
+          + "becomes unavailable. Should be greater than min_offer_hold_time + 
"
+          + "offer_hold_jitter_window.")
+  private static final Arg<Amount<Long, Time>> UNAVAILABILITY_THRESHOLD =
+      Arg.create(Amount.of(6L, Time.MINUTES));
+
+  /**
+   * Binding annotation for the threshold to veto tasks with unavailability.
+   */
+  @Qualifier
+  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+  public @interface UnavailabilityThreshold { }
+
   @Override
   protected void configure() {
+    long offerHoldTime = OFFER_HOLD_JITTER_WINDOW.get().as(Time.SECONDS)
+        + MIN_OFFER_HOLD_TIME.get().as(Time.SECONDS);
+    if (UNAVAILABILITY_THRESHOLD.get().as(Time.SECONDS) < offerHoldTime) {
+      LOG.warn("unavailability_threshold ({}) is less than the sum of 
min_offer_hold_time ({}) and"
+          + "offer_hold_jitter_window ({}). This creates risks of races 
between launching and"
+          + "draining",
+          UNAVAILABILITY_THRESHOLD.get(),
+          MIN_OFFER_HOLD_TIME.get(),
+          OFFER_HOLD_JITTER_WINDOW.get());
+    }
+
+    bind(new TypeLiteral<Amount<Long, Time>>() { })
+        .annotatedWith(UnavailabilityThreshold.class)
+        .toInstance(UNAVAILABILITY_THRESHOLD.get());
+
     install(new PrivateModule() {
       @Override
       protected void configure() {
@@ -65,6 +106,7 @@ public class OffersModule extends AbstractModule {
         bind(OfferManager.class).to(OfferManager.OfferManagerImpl.class);
         bind(OfferManager.OfferManagerImpl.class).in(Singleton.class);
         expose(OfferManager.class);
+        expose(OfferSettings.class);
       }
     });
     PubsubEventModule.bindSubscriber(binder(), OfferManager.class);

http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
 
b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
index ba49e7a..5ed578c 100644
--- 
a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
+++ 
b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
@@ -13,6 +13,7 @@
  */
 package org.apache.aurora.scheduler.preemptor;
 
+import java.time.Instant;
 import java.util.List;
 import java.util.Set;
 
@@ -214,8 +215,14 @@ public interface PreemptionVictimFilter {
       for (PreemptionVictim victim : sortedVictims) {
         toPreemptTasks.add(victim);
         totalResource = totalResource.add(victimToResources.apply(victim));
+
+        Optional<Instant> unavailability = Optional.absent();
+        if (offer.isPresent()) {
+          unavailability = offer.get().getUnavailabilityStart();
+        }
+
         Set<Veto> vetoes = schedulingFilter.filter(
-            new UnusedResource(totalResource, attributes.get()),
+            new UnusedResource(totalResource, attributes.get(), 
unavailability),
             new ResourceRequest(
                 pendingTask,
                 
ResourceManager.bagFromResources(pendingTask.getResources()).add(overhead),

http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java 
b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
index 574efc9..1edb252 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
@@ -42,6 +42,7 @@ import 
org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IHostStatus;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.mesos.v1.Protos;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,7 +56,6 @@ import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
  * All state-changing functions return their results.  Additionally, all 
state-changing functions
  * will ignore requests to change state of unknown hosts and subsequently omit 
these hosts from
  * return values.
- * TODO(wfarner): Convert use of HostStatus in this API to IHostStatus 
(immutable).
  */
 public interface MaintenanceController {
 
@@ -80,6 +80,14 @@ public interface MaintenanceController {
   Set<IHostStatus> drain(Set<String> hosts);
 
   /**
+   * Drain tasks defined by the inverse offer.
+   * This method doesn't set any host attributes.
+   *
+   * @param inverseOffer the inverse offer to use.
+   */
+  void drainForInverseOffer(Protos.InverseOffer inverseOffer);
+
+  /**
    * Fetches the current maintenance mode of {$code host}.
    *
    * @param host Host to fetch state for.
@@ -120,28 +128,40 @@ public interface MaintenanceController {
       this.batchWorker = requireNonNull(batchWorker);
     }
 
+    private Set<String> drainTasksOnHost(String host, MutableStoreProvider 
store) {
+      Query.Builder query = Query.slaveScoped(host).active();
+      Set<String> activeTasks = 
FluentIterable.from(store.getTaskStore().fetchTasks(query))
+          .transform(Tasks::id)
+          .toSet();
+
+      if (activeTasks.isEmpty()) {
+        LOG.info("No tasks to drain on host: {}", host);
+        // Simple way to avoid the log message if there are no tasks.
+        return activeTasks;
+      } else {
+        LOG.info("Draining tasks: {} on host: {}", activeTasks, host);
+        for (String taskId : activeTasks) {
+          stateManager.changeState(
+              store,
+              taskId,
+              Optional.absent(),
+              ScheduleStatus.DRAINING,
+              DRAINING_MESSAGE);
+        }
+
+        return activeTasks;
+      }
+
+    }
+
     private Set<IHostStatus> watchDrainingTasks(MutableStoreProvider store, 
Set<String> hosts) {
       LOG.info("Hosts to drain: " + hosts);
       Set<String> emptyHosts = Sets.newHashSet();
       for (String host : hosts) {
+        Set<String> drainedTasks = drainTasksOnHost(host, store);
         // If there are no tasks on the host, immediately transition to 
DRAINED.
-        Query.Builder query = Query.slaveScoped(host).active();
-        Set<String> activeTasks = 
FluentIterable.from(store.getTaskStore().fetchTasks(query))
-            .transform(Tasks::id)
-            .toSet();
-        if (activeTasks.isEmpty()) {
-          LOG.info("No tasks to drain for host: " + host);
+        if (drainedTasks.isEmpty()) {
           emptyHosts.add(host);
-        } else {
-          LOG.info("Draining tasks: {} on host: {}", activeTasks, host);
-          for (String taskId : activeTasks) {
-            stateManager.changeState(
-                store,
-                taskId,
-                Optional.absent(),
-                ScheduleStatus.DRAINING,
-                DRAINING_MESSAGE);
-          }
         }
       }
 
@@ -195,6 +215,28 @@ public interface MaintenanceController {
       return storage.write(store -> watchDrainingTasks(store, hosts));
     }
 
+    private Optional<String> getHostname(Protos.InverseOffer offer) {
+      if (offer.getUrl().getAddress().hasHostname()) {
+        return Optional.of(offer.getUrl().getAddress().getHostname());
+      } else {
+        return Optional.absent();
+      }
+    }
+
+    @Override
+    public void drainForInverseOffer(Protos.InverseOffer offer) {
+      // TaskStore does not allow for querying by agent id.
+      Optional<String> hostname = getHostname(offer);
+
+      if (hostname.isPresent()) {
+        String host = hostname.get();
+        storage.write(storeProvider -> drainTasksOnHost(host, storeProvider));
+      } else {
+        LOG.error("Unable to drain tasks on agent {} because "
+            + "no hostname attached to inverse offer {}.", offer.getAgentId(), 
offer.getId());
+      }
+    }
+
     private static final Function<IHostAttributes, String> HOST_NAME =
         IHostAttributes::getHost;
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java 
b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
index da378e8..a177b30 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
@@ -171,7 +171,10 @@ public interface TaskAssigner {
         }
 
         Set<Veto> vetoes = filter.filter(
-            new UnusedResource(offer.getResourceBag(tierInfo), 
offer.getAttributes()),
+            new UnusedResource(
+                offer.getResourceBag(tierInfo),
+                offer.getAttributes(),
+                offer.getUnavailabilityStart()),
             resourceRequest);
 
         if (vetoes.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
 
b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
index 1d7f9f4..5915e47 100644
--- 
a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
+++ 
b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
@@ -13,15 +13,20 @@
  */
 package org.apache.aurora.scheduler.filter;
 
+import java.time.Instant;
 import java.util.Arrays;
 import java.util.Set;
 
+import com.google.common.base.Optional;
 import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.common.util.testing.FakeClock;
 import org.apache.aurora.gen.Attribute;
 import org.apache.aurora.gen.Constraint;
 import org.apache.aurora.gen.ExecutorConfig;
@@ -87,11 +92,13 @@ public class SchedulingFilterImplTest extends EasyMockTest {
       mesosScalar(DISK_MB, DEFAULT_DISK),
       mesosRange(PORTS, 80, 81)));
 
+  private static final Amount<Long, Time> UNAVAILABILITY_THRESHOLD = 
Amount.of(2L, Time.MINUTES);
+  private final FakeClock clock = new FakeClock();
   private SchedulingFilter defaultFilter;
 
   @Before
   public void setUp() {
-    defaultFilter = new SchedulingFilterImpl();
+    defaultFilter = new SchedulingFilterImpl(UNAVAILABILITY_THRESHOLD, clock);
   }
 
   @Test
@@ -217,6 +224,55 @@ public class SchedulingFilterImplTest extends EasyMockTest 
{
   }
 
   @Test
+  public void testDrainingMesosMaintenance() {
+    // Start the test at minute 8
+    clock.advance(Amount.of(8L, Time.MINUTES));
+
+    // The agent will go down at minute 9
+    // this is less than the threshold of two minutes
+
+    Instant start = Instant.ofEpochMilli(Amount.of(9L, 
Time.MINUTES).as(Time.MILLISECONDS));
+
+    ITaskConfig task = makeTask();
+    UnusedResource unusedResource = new UnusedResource(
+        DEFAULT_OFFER,
+        hostAttributes(HOST_A),
+        Optional.of(start));
+    ResourceRequest request = new ResourceRequest(task, bag(task), empty());
+
+    control.replay();
+
+    assertEquals(
+        ImmutableSet.of(Veto.maintenance("draining")),
+        defaultFilter.filter(unusedResource, request));
+  }
+
+  @Test
+  public void testNotVetoingWithMesosMaintenace() {
+    // Start the test at minute 8
+    clock.advance(Amount.of(8L, Time.MINUTES));
+
+    // The agent will go down at minute 100
+    // this is greater than the threshold of two minutes
+
+    Instant start = Instant.ofEpochMilli(Amount.of(100L, 
Time.MINUTES).as(Time.MILLISECONDS));
+
+    ITaskConfig task = makeTask();
+    UnusedResource unusedResource = new UnusedResource(
+        DEFAULT_OFFER,
+        hostAttributes(HOST_A),
+        Optional.of(start));
+    ResourceRequest request = new ResourceRequest(task, bag(task), empty());
+
+    control.replay();
+
+    assertEquals(
+        ImmutableSet.of(),
+        defaultFilter.filter(unusedResource, request));
+
+  }
+
+  @Test
   public void testHostDrainingForMaintenance() {
     control.replay();
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java 
b/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java
index 80f631e..52041b5 100644
--- 
a/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java
+++ 
b/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java
@@ -21,7 +21,10 @@ import com.google.common.util.concurrent.MoreExecutors;
 
 import org.apache.aurora.common.application.Lifecycle;
 import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.common.util.testing.FakeClock;
 import org.apache.aurora.gen.HostAttributes;
 import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.TaskStatusHandler;
@@ -30,6 +33,7 @@ import org.apache.aurora.scheduler.base.SchedulerException;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.offers.OfferManager;
+import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
@@ -131,26 +135,44 @@ public class MesosCallbackHandlerTest extends 
EasyMockTest {
       .setReason(Protos.TaskStatus.Reason.REASON_RECONCILIATION)
       .build();
 
+  private static final Amount<Long, Time> DRAIN_THRESHOLD = Amount.of(2L, 
Time.MINUTES);
+
+  private static final Protos.InverseOffer INVERSE_OFFER = 
Protos.InverseOffer.newBuilder()
+      .setAgentId(AGENT_ID)
+      .setFrameworkId(FRAMEWORK)
+      .setId(OFFER_ID)
+      .setUnavailability(Protos.Unavailability.newBuilder()
+          .setStart(Protos.TimeInfo.newBuilder()
+              .setNanoseconds(300000000000L)
+          ))
+      .build();
+
+  private static final Protos.Filters FILTER = 
Protos.Filters.newBuilder().build();
+
   private StorageTestUtil storageUtil;
   private Command shutdownCommand;
   private TaskStatusHandler statusHandler;
   private OfferManager offerManager;
   private EventSink eventSink;
   private FakeStatsProvider statsProvider;
+  private Driver driver;
   private Logger injectedLog;
+  private FakeClock clock;
+  private MaintenanceController controller;
 
   private MesosCallbackHandler handler;
 
   @Before
   public void setUp() {
-
     storageUtil = new StorageTestUtil(this);
     shutdownCommand = createMock(Command.class);
     statusHandler = createMock(TaskStatusHandler.class);
     offerManager = createMock(OfferManager.class);
     eventSink = createMock(EventSink.class);
     statsProvider = new FakeStatsProvider();
-
+    driver = createMock(Driver.class);
+    clock = new FakeClock();
+    controller = createMock(MaintenanceController.class);
     createHandler(false);
   }
 
@@ -169,7 +191,11 @@ public class MesosCallbackHandlerTest extends EasyMockTest 
{
         eventSink,
         MoreExecutors.directExecutor(),
         injectedLog,
-        statsProvider);
+        statsProvider,
+        driver,
+        clock,
+        controller,
+        DRAIN_THRESHOLD);
 
   }
 
@@ -427,4 +453,27 @@ public class MesosCallbackHandlerTest extends EasyMockTest 
{
 
     handler.handleMessage(EXECUTOR_ID, AGENT_ID);
   }
+
+  @Test
+  public void testInverseOfferInTheFuture() {
+    driver.acceptInverseOffer(OFFER_ID, FILTER);
+
+    control.replay();
+
+    handler.handleInverseOffer(ImmutableList.of(INVERSE_OFFER));
+    assertEquals(1L, statsProvider.getLongValue("scheduler_inverse_offers"));
+  }
+
+  @Test
+  public void testInverseOfferWithinThreshold() {
+    clock.advance(Amount.of(4L, Time.MINUTES));
+
+    driver.acceptInverseOffer(OFFER_ID, FILTER);
+    controller.drainForInverseOffer(INVERSE_OFFER);
+
+    control.replay();
+
+    handler.handleInverseOffer(ImmutableList.of(INVERSE_OFFER));
+    assertEquals(1L, statsProvider.getLongValue("scheduler_inverse_offers"));
+  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/test/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverServiceTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverServiceTest.java
 
b/src/test/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverServiceTest.java
index a6c28bb..a72bd4d 100644
--- 
a/src/test/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverServiceTest.java
+++ 
b/src/test/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverServiceTest.java
@@ -180,6 +180,33 @@ public class VersionedSchedulerDriverServiceTest extends 
EasyMockTest {
     assertEquals(accept.getAccept().getOperationsList(), ImmutableList.of());
   }
 
+  @Test
+  public void testAcceptInverseOffer() {
+    expectStart();
+
+    
expect(storage.schedulerStore.fetchFrameworkId()).andReturn(Optional.of(FRAMEWORK_ID));
+
+    Capture<Call> acceptCapture = createCapture();
+    mesos.send(capture(acceptCapture));
+    expectLastCall().once();
+
+    control.replay();
+    driverService.startAsync().awaitRunning();
+    driverService.registered(new PubsubEvent.DriverRegistered());
+
+    driverService.acceptInverseOffer(OFFER_ID, FILTER);
+
+    assertTrue(acceptCapture.hasCaptured());
+    Call accept = acceptCapture.getValue();
+    assertEquals(accept.getFrameworkId().getValue(), FRAMEWORK_ID);
+    assertEquals(accept.getType(), Call.Type.ACCEPT_INVERSE_OFFERS);
+    assertEquals(accept.getAcceptInverseOffers().getFilters(), FILTER);
+    assertEquals(
+        accept.getAcceptInverseOffers().getInverseOfferIdsList(),
+        ImmutableList.of(OFFER_ID)
+    );
+  }
+
   private void expectStart() {
     storage.expectOperations();
     
expect(storage.schedulerStore.fetchFrameworkId()).andReturn(Optional.of(FRAMEWORK_ID));

http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java 
b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
index 49d4e82..d7addc0 100644
--- a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
@@ -41,6 +41,8 @@ import org.apache.mesos.v1.Protos;
 import org.apache.mesos.v1.Protos.Filters;
 import org.apache.mesos.v1.Protos.Offer.Operation;
 import org.apache.mesos.v1.Protos.TaskInfo;
+import org.apache.mesos.v1.Protos.TimeInfo;
+import org.apache.mesos.v1.Protos.Unavailability;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -62,6 +64,7 @@ import static org.junit.Assert.fail;
 public class OfferManagerImplTest extends EasyMockTest {
 
   private static final Amount<Long, Time> RETURN_DELAY = Amount.of(1L, 
Time.DAYS);
+  private static final Amount<Long, Time> ONE_HOUR = Amount.of(1L, Time.HOURS);
   private static final String HOST_A = "HOST_A";
   private static final IHostAttributes HOST_ATTRIBUTES_A =
       IHostAttributes.build(new 
HostAttributes().setMode(NONE).setHost(HOST_A));
@@ -115,7 +118,35 @@ public class OfferManagerImplTest extends EasyMockTest {
   }
 
   @Test
-  public void testOffersSorted() throws Exception {
+  public void testOffersSortedByUnavailability() throws Exception {
+    clock.advance(Amount.of(1L, Time.HOURS));
+
+    HostOffer hostOfferB = setUnavailability(OFFER_B, clock.nowMillis());
+    Long offerCStartTime = clock.nowMillis() + ONE_HOUR.as(Time.MILLISECONDS);
+    HostOffer hostOfferC = setUnavailability(OFFER_C, offerCStartTime);
+
+    driver.declineOffer(OFFER_B.getOffer().getId(), OFFER_FILTER);
+    driver.declineOffer(OFFER_A.getOffer().getId(), OFFER_FILTER);
+    driver.declineOffer(OFFER_C.getOffer().getId(), OFFER_FILTER);
+
+    control.replay();
+
+    offerManager.addOffer(hostOfferB);
+    offerManager.addOffer(OFFER_A);
+    offerManager.addOffer(hostOfferC);
+
+    List<HostOffer> actual = ImmutableList.copyOf(offerManager.getOffers());
+
+    assertEquals(
+        // hostOfferC has a further away start time, so it should be preferred.
+        ImmutableList.of(OFFER_A, hostOfferC, hostOfferB),
+        actual);
+
+    clock.advance(RETURN_DELAY);
+  }
+
+  @Test
+  public void testOffersSortedByMaintenance() throws Exception {
     // Ensures that non-DRAINING offers are preferred - the DRAINING offer 
would be tried last.
 
     HostOffer offerA = setMode(OFFER_A, DRAINING);
@@ -337,6 +368,14 @@ public class OfferManagerImplTest extends EasyMockTest {
     assertEquals(0L, statsProvider.getLongValue(OUTSTANDING_OFFERS));
   }
 
+  private static HostOffer setUnavailability(HostOffer offer, Long startMs) {
+    Unavailability unavailability = Unavailability.newBuilder()
+        .setStart(TimeInfo.newBuilder().setNanoseconds(startMs * 
1000L)).build();
+    return new HostOffer(
+        offer.getOffer().toBuilder().setUnavailability(unavailability).build(),
+        offer.getAttributes());
+  }
+
   private static HostOffer setMode(HostOffer offer, MaintenanceMode mode) {
     return new HostOffer(
         offer.getOffer(),

http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
 
b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
index 02bfc51..d310f8d 100644
--- 
a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
+++ 
b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
@@ -25,7 +25,9 @@ import com.google.common.collect.Sets;
 
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Data;
+import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.common.util.testing.FakeClock;
 import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.Attribute;
 import org.apache.aurora.gen.ExecutorConfig;
@@ -98,12 +100,14 @@ public class PreemptionVictimFilterTest extends 
EasyMockTest {
   private static final String HOST_ATTRIBUTE = "host";
   private static final String OFFER = "offer";
   private static final Optional<HostOffer> NO_OFFER = Optional.absent();
+  private static final Amount<Long, Time> UNAVAILABLITY_THRESHOLD = 
Amount.of(1L, Time.MINUTES);
 
   private StorageTestUtil storageUtil;
   private SchedulingFilter schedulingFilter;
   private FakeStatsProvider statsProvider;
   private PreemptorMetrics preemptorMetrics;
   private TierManager tierManager;
+  private FakeClock clock;
 
   @Before
   public void setUp() {
@@ -112,6 +116,7 @@ public class PreemptionVictimFilterTest extends 
EasyMockTest {
     statsProvider = new FakeStatsProvider();
     preemptorMetrics = new PreemptorMetrics(new CachedCounters(statsProvider));
     tierManager = createMock(TierManager.class);
+    clock = new FakeClock();
   }
 
   private Optional<ImmutableSet<PreemptionVictim>> runFilter(
@@ -267,7 +272,7 @@ public class PreemptionVictimFilterTest extends 
EasyMockTest {
   // Ensures a production task can preempt 2 tasks on the same host.
   @Test
   public void testProductionPreemptingManyNonProduction() throws Exception {
-    schedulingFilter = new SchedulingFilterImpl();
+    schedulingFilter = new SchedulingFilterImpl(UNAVAILABLITY_THRESHOLD, 
clock);
     ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
     setResource(a1, CPUS, 1.0);
     setResource(a1, RAM_MB, 512.0);
@@ -295,7 +300,7 @@ public class PreemptionVictimFilterTest extends 
EasyMockTest {
   // Ensures we select the minimal number of tasks to preempt
   @Test
   public void testMinimalSetPreempted() throws Exception {
-    schedulingFilter = new SchedulingFilterImpl();
+    schedulingFilter = new SchedulingFilterImpl(UNAVAILABLITY_THRESHOLD, 
clock);
     ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
     setResource(a1, CPUS, 4.0);
     setResource(a1, RAM_MB, 4096.0);
@@ -330,7 +335,7 @@ public class PreemptionVictimFilterTest extends 
EasyMockTest {
   // Ensures a production task *never* preempts a production task from another 
job.
   @Test
   public void testProductionJobNeverPreemptsProductionJob() throws Exception {
-    schedulingFilter = new SchedulingFilterImpl();
+    schedulingFilter = new SchedulingFilterImpl(UNAVAILABLITY_THRESHOLD, 
clock);
     ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1");
     p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024);
     expectGetTier(p1, PREFERRED_TIER);
@@ -350,7 +355,7 @@ public class PreemptionVictimFilterTest extends 
EasyMockTest {
   // Ensures that we can preempt if a task + offer can satisfy a pending task.
   @Test
   public void testPreemptWithOfferAndTask() throws Exception {
-    schedulingFilter = new SchedulingFilterImpl();
+    schedulingFilter = new SchedulingFilterImpl(UNAVAILABLITY_THRESHOLD, 
clock);
 
     setUpHost();
 
@@ -375,7 +380,7 @@ public class PreemptionVictimFilterTest extends 
EasyMockTest {
   // Ensures revocable offer resources are filtered out.
   @Test
   public void testRevocableOfferFiltered() throws Exception {
-    schedulingFilter = new SchedulingFilterImpl();
+    schedulingFilter = new SchedulingFilterImpl(UNAVAILABLITY_THRESHOLD, 
clock);
 
     setUpHost();
 
@@ -400,7 +405,7 @@ public class PreemptionVictimFilterTest extends 
EasyMockTest {
   // Ensures revocable task CPU is not considered for preemption.
   @Test
   public void testRevocableVictimsFiltered() throws Exception {
-    schedulingFilter = new SchedulingFilterImpl();
+    schedulingFilter = new SchedulingFilterImpl(UNAVAILABLITY_THRESHOLD, 
clock);
 
     setUpHost();
 
@@ -425,7 +430,7 @@ public class PreemptionVictimFilterTest extends 
EasyMockTest {
   // Ensures revocable victim non-compressible resources are still considered.
   @Test
   public void testRevocableVictimRamUsed() throws Exception {
-    schedulingFilter = new SchedulingFilterImpl();
+    schedulingFilter = new SchedulingFilterImpl(UNAVAILABLITY_THRESHOLD, 
clock);
 
     setUpHost();
 
@@ -452,7 +457,7 @@ public class PreemptionVictimFilterTest extends 
EasyMockTest {
   // Ensures we can preempt if two tasks and an offer can satisfy a pending 
task.
   @Test
   public void testPreemptWithOfferAndMultipleTasks() throws Exception {
-    schedulingFilter = new SchedulingFilterImpl();
+    schedulingFilter = new SchedulingFilterImpl(UNAVAILABLITY_THRESHOLD, 
clock);
 
     setUpHost();
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
 
b/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
index ae83dea..0f1d4d4 100644
--- 
a/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
+++ 
b/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
@@ -23,6 +23,8 @@ import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.common.stats.StatsProvider;
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
 import org.apache.aurora.gen.HostAttributes;
@@ -44,6 +46,7 @@ import 
org.apache.aurora.scheduler.storage.entities.IHostStatus;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.apache.mesos.v1.Protos;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -62,6 +65,33 @@ public class MaintenanceControllerImplTest extends 
EasyMockTest {
 
   private static final String HOST_A = "a";
   private static final Set<String> A = ImmutableSet.of(HOST_A);
+  private static final Protos.OfferID OFFER_ID = Protos.OfferID.newBuilder()
+      .setValue("offer-id")
+      .build();
+  private static final Protos.AgentID AGENT_ID = Protos.AgentID.newBuilder()
+      .setValue("agent-id")
+      .build();
+  private static final Protos.FrameworkID FRAMEWORK_ID = 
Protos.FrameworkID.newBuilder()
+      .setValue("framework-id")
+      .build();
+  private static final Protos.URL AGENT_URL = Protos.URL.newBuilder()
+      .setAddress(Protos.Address.newBuilder()
+          .setHostname(HOST_A)
+          .setPort(5051))
+      .setScheme("http")
+      .build();
+  private static final Protos.Unavailability UNAVAILABILITY = 
Protos.Unavailability.newBuilder()
+      .setStart(Protos.TimeInfo.newBuilder()
+          .setNanoseconds(Amount.of(1L, Time.MINUTES).as(Time.NANOSECONDS)))
+      .build();
+
+  private static final Protos.InverseOffer INVERSE_OFFER = 
Protos.InverseOffer.newBuilder()
+      .setId(OFFER_ID)
+      .setAgentId(AGENT_ID)
+      .setUrl(AGENT_URL)
+      .setFrameworkId(FRAMEWORK_ID)
+      .setUnavailability(UNAVAILABILITY)
+      .build();
 
   private StorageTestUtil storageUtil;
   private StateManager stateManager;
@@ -195,6 +225,16 @@ public class MaintenanceControllerImplTest extends 
EasyMockTest {
     assertEquals(NONE, maintenance.getMode("unknown"));
   }
 
+  @Test
+  public void testInverseOfferDrain() {
+    IScheduledTask task1 = makeTask(HOST_A, "taskA");
+    expectFetchTasksByHost(HOST_A, ImmutableSet.of(task1));
+    expectTaskDraining(task1);
+
+    control.replay();
+    maintenance.drainForInverseOffer(INVERSE_OFFER);
+  }
+
   private void expectTaskDraining(IScheduledTask task) {
     expect(stateManager.changeState(
         storageUtil.mutableStoreProvider,

http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/test/sh/org/apache/aurora/e2e/generate_mesos_maintenance_schedule.py
----------------------------------------------------------------------
diff --git 
a/src/test/sh/org/apache/aurora/e2e/generate_mesos_maintenance_schedule.py 
b/src/test/sh/org/apache/aurora/e2e/generate_mesos_maintenance_schedule.py
new file mode 100644
index 0000000..668ff11
--- /dev/null
+++ b/src/test/sh/org/apache/aurora/e2e/generate_mesos_maintenance_schedule.py
@@ -0,0 +1,43 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+# This generates a mesos maintnenace schedule for draining the only host in two
+# minutes.
+import json
+import time
+
+# Get the current time + two minutes and convert it into nanos
+start_secs = int(time.time() + 120)
+start_ns = start_secs * 10**9
+
+machine_id = {
+        "hostname": "192.168.33.7",
+        "ip": "192.168.33.7"
+}
+
+unavailability = {
+        "start": {"nanoseconds": start_ns}
+        }
+
+window = {
+        "machine_ids": [machine_id],
+        "unavailability": unavailability
+        }
+
+schedule = {
+        "windows": [window]
+}
+
+print json.dumps(schedule)

http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/test/sh/org/apache/aurora/e2e/http/http_example.aurora
----------------------------------------------------------------------
diff --git a/src/test/sh/org/apache/aurora/e2e/http/http_example.aurora 
b/src/test/sh/org/apache/aurora/e2e/http/http_example.aurora
index de81792..be43a09 100644
--- a/src/test/sh/org/apache/aurora/e2e/http/http_example.aurora
+++ b/src/test/sh/org/apache/aurora/e2e/http/http_example.aurora
@@ -134,6 +134,9 @@ jobs = [
     name = 'http_example'
   ).bind(profile=DefaultProfile()),
   job(
+    name = 'http_example_maintenance'
+  ).bind(profile=DefaultProfile()),
+  job(
     name = 'http_example_watch_secs',
     update_config = update_config_watch_secs
   ).bind(profile=DefaultProfile()),

http://git-wip-us.apache.org/repos/asf/aurora/blob/c32f14c7/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh
----------------------------------------------------------------------
diff --git a/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh 
b/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh
index 80b4c54..1a81dc5 100755
--- a/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh
+++ b/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh
@@ -79,6 +79,57 @@ test_version() {
   [[ $(aurora --version 2>&1) = $(cat /vagrant/.auroraversion) ]]
 }
 
+clear_mesos_maintenance() {
+  curl http://"$TEST_SLAVE_IP":5050/maintenance/schedule \
+    -H "Content-type: application/json" \
+    -X POST \
+    -d "{}"
+}
+
+test_mesos_maintenance() {
+  local _cluster=$1 _role=$2 _env=$3
+  local _base_config=$4
+  local _job=$7
+  local _jobkey="$_cluster/$_role/$_env/$_job"
+
+  # Clear any previous maintenance schedules before running this test.
+  clear_mesos_maintenance
+
+  test_create $_jobkey $_base_config
+
+  echo "Waiting job to enter RUNNING..."
+  wait_until_task_status $_jobkey "0" "RUNNING"
+
+  # Create the maintenance schedule
+  MAINTENANCE_SCHEDULE="/tmp/maintenance_schedule.json"
+  python \
+  
/vagrant/src/test/sh/org/apache/aurora/e2e/generate_mesos_maintenance_schedule.py
 > "$MAINTENANCE_SCHEDULE"
+  echo "Creating maintenance with schedule"
+  cat $MAINTENANCE_SCHEDULE | jq .
+
+  curl http://"$TEST_SLAVE_IP":5050/maintenance/schedule \
+    -H "Content-type: application/json" \
+    -X POST \
+    -d @"$MAINTENANCE_SCHEDULE"
+
+  trap clear_mesos_maintenance EXIT
+
+  # Posting of a maintenance schedule should not cause the task to drain right
+  # away.
+  assert_task_status $_jobkey "0" "RUNNING"
+
+  # When it is drain time, it should be killed.
+  echo "Waiting for time to drain tasks..."
+  wait_until_task_status $_jobkey "0" "PENDING"
+
+  clear_mesos_maintenance
+
+  echo "Waiting for drained task to re-launch..."
+  wait_until_task_status $_jobkey "0" "RUNNING"
+
+  test_kill $_jobkey
+}
+
 test_health_check() {
   [[ $(_curl "$TEST_SLAVE_IP:8081/health") == 'OK' ]]
 }
@@ -167,6 +218,39 @@ assert_update_state() {
   fi
 }
 
+assert_task_status() {
+  local _jobkey=$1 _id=$2 _expected_state=$3
+
+  local _state=$(aurora job status $_jobkey --write-json | jq -r 
".[0].active[$_id].status")
+
+  if [[ $_state != $_expected_state ]]; then
+    echo "Expected task to be in state $_expected_state, but found $_state"
+    exit 1
+  fi
+}
+
+wait_until_task_status() {
+  # Poll the task, waiting for it to enter the target state
+  local _jobkey=$1 _id=$2 _expected_state=$3
+  local _state=""
+  local _success=0
+
+  for i in $(seq 1 120); do
+    _state=$(aurora job status $_jobkey --write-json | jq -r 
".[0].active[$_id].status")
+    if [[ $_state == $_expected_state ]]; then
+      _success=1
+      break
+    else
+      sleep 1
+    fi
+  done
+
+  if [[ "$_success" -ne "1" ]]; then
+    echo "Task did not transition to $_expected_state within two minutes."
+    exit 1
+  fi
+}
+
 test_update() {
   local _jobkey=$1 _config=$2 _cluster=$3
   shift 3
@@ -514,6 +598,7 @@ TEST_CLUSTER=devcluster
 TEST_ROLE=vagrant
 TEST_ENV=test
 TEST_JOB=http_example
+TEST_MAINTENANCE_JOB=http_example_maintenance
 TEST_JOB_WATCH_SECS=http_example_watch_secs
 TEST_JOB_REVOCABLE=http_example_revocable
 TEST_JOB_GPU=http_example_gpu
@@ -539,6 +624,8 @@ BASE_ARGS=(
 
 TEST_JOB_ARGS=("${BASE_ARGS[@]}" "$TEST_JOB")
 
+TEST_MAINTENANCE_JOB_ARGS=("${BASE_ARGS[@]}" "$TEST_MAINTENANCE_JOB")
+
 TEST_JOB_WATCH_SECS_ARGS=("${BASE_ARGS[@]}" "$TEST_JOB_WATCH_SECS")
 
 TEST_JOB_REVOCABLE_ARGS=("${BASE_ARGS[@]}" "$TEST_JOB_REVOCABLE")
@@ -578,6 +665,8 @@ test_http_example "${TEST_JOB_ARGS[@]}"
 test_http_example "${TEST_JOB_WATCH_SECS_ARGS[@]}"
 test_health_check
 
+test_mesos_maintenance "${TEST_MAINTENANCE_JOB_ARGS[@]}"
+
 test_http_example_basic "${TEST_JOB_REVOCABLE_ARGS[@]}"
 
 test_http_example_basic "${TEST_JOB_GPU_ARGS[@]}"

Reply via email to