Repository: aurora Updated Branches: refs/heads/master 2cbaeecce -> 73234d09d
Process rescinds in the same thread pool as offers. In a a production environment I was able to observe the following: ``` I0606 00:31:32.510 [Thread-77638, MesosCallbackHandler$MesosCallbackHandlerImpl:229] Offer rescinded: 81e04cbd-9bce-41cf-bd94-38c911f255e4-O142359552 I0606 00:31:32.903 [SchedulerImpl-0, MesosCallbackHandler$MesosCallbackHandlerImpl:211] Received offer: 81e04cbd-9bce-41cf-bd94-38c911f255e4-O142359552 I0606 00:31:34.815 [TaskGroupBatchWorker, VersionedSchedulerDriverService:123] Accepting offer 81e04cbd-9bce-41cf-bd94-38c911f255e4-O142359552 with ops [LAUNCH] ``` Notice that the offer rescind was processed before the actual offer. This is possible because there is a race in the `MesosCallbackHandlerImpl`. The offer is processed in the executor (to prevent blocking) and the rescind is handled directly. This means the offer procecssing thread (`SchedulerImpl-0`) is racing against the callback thread (`Thread-77638`). In normal operation, there will be seconds to minutes between a rescind and an offer, but in some cases an offer can be rescinded very quickly in clusters that use oversubscription modules. To fix this, we move the rescind processing into the same executor as the offer processing to ensure they are processed in the order they are received. Without fixing this, the rescinded offer exists in the offer manager and can be used later to launch a task. This task will immediately fail to launch because the offer is invalid. In this patch, I have also added a metric and logging to record when we fail to remove an offer from the offer manager, and cleaned up the logging to allow operators to see when an offer was recieved. With this logging, an operator can grep for the offer id and see the entire lifecycle of the offer in the scheduler. Bugs closed: AURORA-1933 Reviewed at https://reviews.apache.org/r/59853/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/73234d09 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/73234d09 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/73234d09 Branch: refs/heads/master Commit: 73234d09d803ebc718c0c0ea454c5629714ef8e6 Parents: 2cbaeec Author: Zameer Manji <[email protected]> Authored: Tue Jun 6 14:21:22 2017 -0700 Committer: Zameer Manji <[email protected]> Committed: Tue Jun 6 14:21:22 2017 -0700 ---------------------------------------------------------------------- .../scheduler/mesos/MesosCallbackHandler.java | 25 +++++++++++++------- .../mesos/VersionedSchedulerDriverService.java | 2 +- .../aurora/scheduler/offers/OfferManager.java | 13 +++++++++- .../scheduler/offers/OfferManagerImplTest.java | 10 ++++++++ 4 files changed, 40 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/73234d09/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java index 5a5281a..772a04c 100644 --- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java +++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java @@ -200,6 +200,9 @@ public interface MesosCallbackHandler { return; } + // NOTE: We need to use the executor here to save attributes and store the offer because this + // requires the storage lock which can block. We cannot block in the libmesos callback + // handler without ill effects. executor.execute(() -> { // TODO(wfarner): Reconsider the requirements here, augment the task scheduler to skip over // offers when the host attributes cannot be found. (AURORA-137) @@ -208,7 +211,7 @@ public interface MesosCallbackHandler { IHostAttributes attributes = AttributeStore.Util.mergeOffer(storeProvider.getAttributeStore(), offer); storeProvider.getAttributeStore().saveHostAttributes(attributes); - log.debug("Received offer: {}", offer); + log.info("Received offer: {}", offer.getId().getValue()); offersReceived.incrementAndGet(); offerManager.addOffer(new HostOffer(offer, attributes)); } @@ -217,6 +220,19 @@ public interface MesosCallbackHandler { } @Override + public void handleRescind(OfferID offerId) { + // NOTE: We need to use the executor here to prevent racing against processing the offers. + // If the callback thread rescinded the offers directly, it could rescind the offer + // before the thread for processing the offer got the storage lock and was able to write. + // Therefore we use the executor here to also process the rescind to maintain the ordering. + executor.execute(() -> { + log.info("Offer rescinded: {}", offerId.getValue()); + offerManager.cancelOffer(offerId); + offersRescinded.incrementAndGet(); + }); + } + + @Override public void handleDisconnection() { log.warn("Framework disconnected."); disconnects.incrementAndGet(); @@ -225,13 +241,6 @@ public interface MesosCallbackHandler { } @Override - public void handleRescind(OfferID offerId) { - log.info("Offer rescinded: {}", offerId.getValue()); - offerManager.cancelOffer(offerId); - offersRescinded.incrementAndGet(); - } - - @Override public void handleMessage(ExecutorID executorID, AgentID agentID) { log.warn( "Ignoring framework message from {} on {}.", http://git-wip-us.apache.org/repos/asf/aurora/blob/73234d09/src/main/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverService.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverService.java b/src/main/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverService.java index 5e86504..4609064 100644 --- a/src/main/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverService.java +++ b/src/main/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverService.java @@ -120,7 +120,7 @@ class VersionedSchedulerDriverService extends AbstractIdleService public void acceptOffers(OfferID offerId, Collection<Operation> operations, Filters filter) { whenRegistered(() -> { Collection<Operation.Type> opTypes = Collections2.transform(operations, Operation::getType); - LOG.info("Accepting offer {} with ops {}", offerId, opTypes); + LOG.info("Accepting offer {} with ops {}", offerId.getValue(), opTypes); Futures.getUnchecked(mesosFuture).send( Call.newBuilder() http://git-wip-us.apache.org/repos/asf/aurora/blob/73234d09/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java index 17e577b..a55f8ad 100644 --- a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java +++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java @@ -145,9 +145,12 @@ public interface OfferManager extends EventSubscriber { static final String OUTSTANDING_OFFERS = "outstanding_offers"; @VisibleForTesting static final String STATICALLY_BANNED_OFFERS = "statically_banned_offers_size"; + @VisibleForTesting + static final String OFFER_CANCEL_FAILURES = "offer_cancel_failures"; private final HostOffers hostOffers; private final AtomicLong offerRaces; + private final AtomicLong offerCancelFailures; private final Driver driver; private final OfferSettings offerSettings; @@ -166,6 +169,7 @@ public interface OfferManager extends EventSubscriber { this.executor = requireNonNull(executor); this.hostOffers = new HostOffers(statsProvider, offerSettings.getOfferOrder()); this.offerRaces = statsProvider.makeCounter(OFFER_ACCEPT_RACES); + this.offerCancelFailures = statsProvider.makeCounter(OFFER_CANCEL_FAILURES); } @Override @@ -210,7 +214,14 @@ public interface OfferManager extends EventSubscriber { @Override public void cancelOffer(final OfferID offerId) { - removeFromHostOffers(offerId); + boolean success = removeFromHostOffers(offerId); + if (!success) { + // This will happen rarely when we race to process this rescind against accepting the offer + // to launch a task. + // If it happens frequently, we are likely processing rescinds before the offer itself. + LOG.warn("Failed to cancel offer: {}.", offerId.getValue()); + this.offerCancelFailures.incrementAndGet(); + } } private boolean removeFromHostOffers(final OfferID offerId) { http://git-wip-us.apache.org/repos/asf/aurora/blob/73234d09/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java index 97febf2..be02449 100644 --- a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java @@ -51,6 +51,7 @@ import static org.apache.aurora.gen.MaintenanceMode.NONE; import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB; import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask; import static org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl.OFFER_ACCEPT_RACES; +import static org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl.OFFER_CANCEL_FAILURES; import static org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl.OUTSTANDING_OFFERS; import static org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl.STATICALLY_BANNED_OFFERS; import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange; @@ -234,6 +235,7 @@ public class OfferManagerImplTest extends EasyMockTest { assertEquals(1L, statsProvider.getLongValue(OUTSTANDING_OFFERS)); offerManager.cancelOffer(OFFER_A_ID); + assertEquals(0L, statsProvider.getLongValue(OFFER_CANCEL_FAILURES)); assertTrue(Iterables.isEmpty(offerManager.getOffers())); assertEquals(0L, statsProvider.getLongValue(OUTSTANDING_OFFERS)); @@ -361,6 +363,14 @@ public class OfferManagerImplTest extends EasyMockTest { } @Test + public void testCancelFailure() throws Exception { + control.replay(); + + offerManager.cancelOffer(OFFER_A.getOffer().getId()); + assertEquals(1L, statsProvider.getLongValue(OFFER_CANCEL_FAILURES)); + } + + @Test public void testDeclineOffer() throws Exception { driver.declineOffer(OFFER_A.getOffer().getId(), OFFER_FILTER); expectLastCall();
