Repository: aurora
Updated Branches:
  refs/heads/master 2cbaeecce -> 73234d09d


Process rescinds in the same thread pool as offers.

In a a production environment I was able to observe the following:
```
I0606 00:31:32.510 [Thread-77638, 
MesosCallbackHandler$MesosCallbackHandlerImpl:229] Offer rescinded: 
81e04cbd-9bce-41cf-bd94-38c911f255e4-O142359552
I0606 00:31:32.903 [SchedulerImpl-0, 
MesosCallbackHandler$MesosCallbackHandlerImpl:211] Received offer: 
81e04cbd-9bce-41cf-bd94-38c911f255e4-O142359552
I0606 00:31:34.815 [TaskGroupBatchWorker, VersionedSchedulerDriverService:123] 
Accepting offer 81e04cbd-9bce-41cf-bd94-38c911f255e4-O142359552 with ops 
[LAUNCH]
```

Notice that the offer rescind was processed before the actual offer. This is
possible because there is a race in the `MesosCallbackHandlerImpl`. The offer is
processed in the executor (to prevent blocking) and the rescind is handled
directly. This means the offer procecssing thread (`SchedulerImpl-0`) is racing
against the callback thread (`Thread-77638`).

In normal operation, there will be seconds to minutes between a rescind and an
offer, but in some cases an offer can be rescinded very quickly in clusters that
use oversubscription modules.

To fix this, we move the rescind processing into the same executor as the offer
processing to ensure they are processed in the order they are received. Without
fixing this, the rescinded offer exists in the offer manager and can be used
later to launch a task. This task will immediately fail to launch because the
offer is invalid.

In this patch, I have also added a metric and logging to record when we fail to
remove an offer from the offer manager, and cleaned up the logging to allow
operators to see when an offer was recieved. With this logging, an operator can
grep for the offer id and see the entire lifecycle of the offer in the
scheduler.

Bugs closed: AURORA-1933

Reviewed at https://reviews.apache.org/r/59853/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/73234d09
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/73234d09
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/73234d09

Branch: refs/heads/master
Commit: 73234d09d803ebc718c0c0ea454c5629714ef8e6
Parents: 2cbaeec
Author: Zameer Manji <[email protected]>
Authored: Tue Jun 6 14:21:22 2017 -0700
Committer: Zameer Manji <[email protected]>
Committed: Tue Jun 6 14:21:22 2017 -0700

----------------------------------------------------------------------
 .../scheduler/mesos/MesosCallbackHandler.java   | 25 +++++++++++++-------
 .../mesos/VersionedSchedulerDriverService.java  |  2 +-
 .../aurora/scheduler/offers/OfferManager.java   | 13 +++++++++-
 .../scheduler/offers/OfferManagerImplTest.java  | 10 ++++++++
 4 files changed, 40 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/73234d09/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java 
b/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
index 5a5281a..772a04c 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
@@ -200,6 +200,9 @@ public interface MesosCallbackHandler {
         return;
       }
 
+      // NOTE: We need to use the executor here to save attributes and store 
the offer because this
+      // requires the storage lock which can block. We cannot block in the 
libmesos callback
+      // handler without ill effects.
       executor.execute(() -> {
         // TODO(wfarner): Reconsider the requirements here, augment the task 
scheduler to skip over
         //                offers when the host attributes cannot be found. 
(AURORA-137)
@@ -208,7 +211,7 @@ public interface MesosCallbackHandler {
             IHostAttributes attributes =
                 
AttributeStore.Util.mergeOffer(storeProvider.getAttributeStore(), offer);
             storeProvider.getAttributeStore().saveHostAttributes(attributes);
-            log.debug("Received offer: {}", offer);
+            log.info("Received offer: {}", offer.getId().getValue());
             offersReceived.incrementAndGet();
             offerManager.addOffer(new HostOffer(offer, attributes));
           }
@@ -217,6 +220,19 @@ public interface MesosCallbackHandler {
     }
 
     @Override
+    public void handleRescind(OfferID offerId) {
+      // NOTE: We need to use the executor here to prevent racing against 
processing the offers.
+      // If the callback thread rescinded the offers directly, it could 
rescind the offer
+      // before the thread for processing the offer got the storage lock and 
was able to write.
+      // Therefore we use the executor here to also process the rescind to 
maintain the ordering.
+      executor.execute(() -> {
+        log.info("Offer rescinded: {}", offerId.getValue());
+        offerManager.cancelOffer(offerId);
+        offersRescinded.incrementAndGet();
+      });
+    }
+
+    @Override
     public void handleDisconnection() {
       log.warn("Framework disconnected.");
       disconnects.incrementAndGet();
@@ -225,13 +241,6 @@ public interface MesosCallbackHandler {
     }
 
     @Override
-    public void handleRescind(OfferID offerId) {
-      log.info("Offer rescinded: {}", offerId.getValue());
-      offerManager.cancelOffer(offerId);
-      offersRescinded.incrementAndGet();
-    }
-
-    @Override
     public void handleMessage(ExecutorID executorID, AgentID agentID) {
       log.warn(
           "Ignoring framework message from {} on {}.",

http://git-wip-us.apache.org/repos/asf/aurora/blob/73234d09/src/main/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverService.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverService.java
 
b/src/main/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverService.java
index 5e86504..4609064 100644
--- 
a/src/main/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverService.java
+++ 
b/src/main/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverService.java
@@ -120,7 +120,7 @@ class VersionedSchedulerDriverService extends 
AbstractIdleService
   public void acceptOffers(OfferID offerId, Collection<Operation> operations, 
Filters filter) {
     whenRegistered(() -> {
       Collection<Operation.Type> opTypes = Collections2.transform(operations, 
Operation::getType);
-      LOG.info("Accepting offer {} with ops {}", offerId, opTypes);
+      LOG.info("Accepting offer {} with ops {}", offerId.getValue(), opTypes);
 
       Futures.getUnchecked(mesosFuture).send(
           Call.newBuilder()

http://git-wip-us.apache.org/repos/asf/aurora/blob/73234d09/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java 
b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
index 17e577b..a55f8ad 100644
--- a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
@@ -145,9 +145,12 @@ public interface OfferManager extends EventSubscriber {
     static final String OUTSTANDING_OFFERS = "outstanding_offers";
     @VisibleForTesting
     static final String STATICALLY_BANNED_OFFERS = 
"statically_banned_offers_size";
+    @VisibleForTesting
+    static final String OFFER_CANCEL_FAILURES = "offer_cancel_failures";
 
     private final HostOffers hostOffers;
     private final AtomicLong offerRaces;
+    private final AtomicLong offerCancelFailures;
 
     private final Driver driver;
     private final OfferSettings offerSettings;
@@ -166,6 +169,7 @@ public interface OfferManager extends EventSubscriber {
       this.executor = requireNonNull(executor);
       this.hostOffers = new HostOffers(statsProvider, 
offerSettings.getOfferOrder());
       this.offerRaces = statsProvider.makeCounter(OFFER_ACCEPT_RACES);
+      this.offerCancelFailures = 
statsProvider.makeCounter(OFFER_CANCEL_FAILURES);
     }
 
     @Override
@@ -210,7 +214,14 @@ public interface OfferManager extends EventSubscriber {
 
     @Override
     public void cancelOffer(final OfferID offerId) {
-      removeFromHostOffers(offerId);
+      boolean success = removeFromHostOffers(offerId);
+      if (!success) {
+        // This will happen rarely when we race to process this rescind 
against accepting the offer
+        // to launch a task.
+        // If it happens frequently, we are likely processing rescinds before 
the offer itself.
+        LOG.warn("Failed to cancel offer: {}.", offerId.getValue());
+        this.offerCancelFailures.incrementAndGet();
+      }
     }
 
     private boolean removeFromHostOffers(final OfferID offerId) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/73234d09/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java 
b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
index 97febf2..be02449 100644
--- a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
@@ -51,6 +51,7 @@ import static org.apache.aurora.gen.MaintenanceMode.NONE;
 import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB;
 import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
 import static 
org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl.OFFER_ACCEPT_RACES;
+import static 
org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl.OFFER_CANCEL_FAILURES;
 import static 
org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl.OUTSTANDING_OFFERS;
 import static 
org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl.STATICALLY_BANNED_OFFERS;
 import static 
org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange;
@@ -234,6 +235,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     assertEquals(1L, statsProvider.getLongValue(OUTSTANDING_OFFERS));
 
     offerManager.cancelOffer(OFFER_A_ID);
+    assertEquals(0L, statsProvider.getLongValue(OFFER_CANCEL_FAILURES));
     assertTrue(Iterables.isEmpty(offerManager.getOffers()));
     assertEquals(0L, statsProvider.getLongValue(OUTSTANDING_OFFERS));
 
@@ -361,6 +363,14 @@ public class OfferManagerImplTest extends EasyMockTest {
   }
 
   @Test
+  public void testCancelFailure() throws Exception {
+    control.replay();
+
+    offerManager.cancelOffer(OFFER_A.getOffer().getId());
+    assertEquals(1L, statsProvider.getLongValue(OFFER_CANCEL_FAILURES));
+  }
+
+  @Test
   public void testDeclineOffer() throws Exception {
     driver.declineOffer(OFFER_A.getOffer().getId(), OFFER_FILTER);
     expectLastCall();

Reply via email to