Repository: aurora Updated Branches: refs/heads/master 53970139d -> a93538966
Provide a formal way to disable offer declining Increasing the offer hold time to effectively disable offer declines is a trap, as the queue of asynchronous declines will grow without bound. This introduces a command line argument to explicitly disable declining. Reviewed at https://reviews.apache.org/r/63157/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/a9353896 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/a9353896 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/a9353896 Branch: refs/heads/master Commit: a935389662b72486f49e507493557d360dc97178 Parents: 5397013 Author: Bill Farner <[email protected]> Authored: Thu Oct 19 19:39:02 2017 -0700 Committer: Bill Farner <[email protected]> Committed: Thu Oct 19 19:39:02 2017 -0700 ---------------------------------------------------------------------- RELEASE-NOTES.md | 5 +- .../aurora/benchmark/SchedulingBenchmarks.java | 7 +- .../aurora/scheduler/offers/Deferment.java | 70 +++++++ .../aurora/scheduler/offers/OfferManager.java | 54 +++-- .../aurora/scheduler/offers/OfferSettings.java | 25 +-- .../aurora/scheduler/offers/OffersModule.java | 46 +++-- .../scheduler/config/CommandLineTest.java | 2 + .../scheduler/offers/OfferManagerImplTest.java | 205 +++++++------------ 8 files changed, 222 insertions(+), 192 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/a9353896/RELEASE-NOTES.md ---------------------------------------------------------------------- diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md index f4cc416..1ec6d74 100644 --- a/RELEASE-NOTES.md +++ b/RELEASE-NOTES.md @@ -16,7 +16,10 @@ - Added the `thrift_method_interceptor_modules` scheduler flag that lets cluster operators inject custom Thrift method interceptors. - Increase default ZooKeeper session timeout from 4 to 15 seconds. -- Add option `-zk_connection_timeout` to control the connection timeout of ZooKeeper connections. +- Added option `-zk_connection_timeout` to control the connection timeout of ZooKeeper connections. +- Added scheduler command line argument `-hold_offers_forever`, suitable for use in clusters where + Aurora is the only framework. This setting disables other options such as `-min_offer_hold_time`, + and allows the scheduler to more efficiently cache scheduling attempts. ### Deprecations and removals: http://git-wip-us.apache.org/repos/asf/aurora/blob/a9353896/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java index 7d37668..5a9099b 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java +++ b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java @@ -55,6 +55,7 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter; import org.apache.aurora.scheduler.filter.SchedulingFilterImpl; import org.apache.aurora.scheduler.mesos.Driver; import org.apache.aurora.scheduler.mesos.TestExecutorSettings; +import org.apache.aurora.scheduler.offers.Deferment; import org.apache.aurora.scheduler.offers.OfferManager; import org.apache.aurora.scheduler.offers.OfferOrder; import org.apache.aurora.scheduler.offers.OfferSettings; @@ -148,13 +149,11 @@ public class SchedulingBenchmarks { // No-op. } }); + bind(Deferment.class).to(Deferment.Noop.class); bind(OfferManager.class).to(OfferManager.OfferManagerImpl.class); bind(OfferManager.OfferManagerImpl.class).in(Singleton.class); bind(OfferSettings.class).toInstance( - new OfferSettings( - NO_DELAY, - () -> DELAY_FOREVER, - ImmutableList.of(OfferOrder.RANDOM))); + new OfferSettings(NO_DELAY, ImmutableList.of(OfferOrder.RANDOM))); bind(BiCache.BiCacheSettings.class).toInstance( new BiCache.BiCacheSettings(DELAY_FOREVER, "")); bind(TaskScheduler.class).to(TaskScheduler.TaskSchedulerImpl.class); http://git-wip-us.apache.org/repos/asf/aurora/blob/a9353896/src/main/java/org/apache/aurora/scheduler/offers/Deferment.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/offers/Deferment.java b/src/main/java/org/apache/aurora/scheduler/offers/Deferment.java new file mode 100644 index 0000000..f3ec886 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/offers/Deferment.java @@ -0,0 +1,70 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.offers; + +import javax.inject.Inject; + +import com.google.common.base.Supplier; + +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; +import org.apache.aurora.scheduler.async.DelayExecutor; + +import static java.util.Objects.requireNonNull; + +/** + * Determines if and when a deferred action should be performed. + */ +public interface Deferment { + + /** + * Defers an action to possibly be performed at some point in the future. + * + * @param action Callback to perform the deferred action. + */ + void defer(Runnable action); + + /** + * Never performs deferred actions. + */ + class Noop implements Deferment { + @Override + public void defer(Runnable action) { + // no-op + } + } + + /** + * Performs a deferred action after a dynamic delay. + */ + class DelayedDeferment implements Deferment { + private final Supplier<Amount<Long, Time>> delay; + private final DelayExecutor executor; + + @Inject + public DelayedDeferment( + Supplier<Amount<Long, Time>> delay, + @AsyncExecutor DelayExecutor executor) { + + this.delay = requireNonNull(delay); + this.executor = requireNonNull(executor); + } + + @Override + public void defer(Runnable action) { + executor.execute(action, delay.get()); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/a9353896/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java index e833431..7011a4c 100644 --- a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java +++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java @@ -37,8 +37,6 @@ import org.apache.aurora.common.inject.TimedInterceptor.Timed; import org.apache.aurora.common.quantity.Time; import org.apache.aurora.common.stats.StatsProvider; import org.apache.aurora.scheduler.HostOffer; -import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; -import org.apache.aurora.scheduler.async.DelayExecutor; import org.apache.aurora.scheduler.base.TaskGroupKey; import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected; import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; @@ -165,7 +163,7 @@ public interface OfferManager extends EventSubscriber { private final Driver driver; private final OfferSettings offerSettings; - private final DelayExecutor executor; + private final Deferment offerDecline; @Inject @VisibleForTesting @@ -173,36 +171,28 @@ public interface OfferManager extends EventSubscriber { Driver driver, OfferSettings offerSettings, StatsProvider statsProvider, - @AsyncExecutor DelayExecutor executor) { + Deferment offerDecline) { this.driver = requireNonNull(driver); this.offerSettings = requireNonNull(offerSettings); - this.executor = requireNonNull(executor); this.hostOffers = new HostOffers(statsProvider, offerSettings.getOfferOrder()); this.offerRaces = statsProvider.makeCounter(OFFER_ACCEPT_RACES); this.offerCancelFailures = statsProvider.makeCounter(OFFER_CANCEL_FAILURES); + this.offerDecline = requireNonNull(offerDecline); } @Override - public void addOffer(final HostOffer offer) { - // We run a slight risk of a race here, which is acceptable. The worst case is that we - // temporarily hold two offers for the same host, which should be corrected when we return - // them after the return delay. - // There's also a chance that we return an offer for compaction ~simultaneously with the - // same-host offer being cancelled/returned. This is also fine. - Optional<HostOffer> sameSlave = hostOffers.get(offer.getOffer().getAgentId()); - if (sameSlave.isPresent()) { - // If there are existing offers for the slave, decline all of them so the master can - // compact all of those offers into a single offer and send them back. + public void addOffer(HostOffer offer) { + Optional<HostOffer> sameAgent = hostOffers.addAndPreventAgentCollision(offer); + if (sameAgent.isPresent()) { + // We have an existing offer for the same agent. We choose to return both offers so that + // they may be combined into a single offer. LOG.info("Returning offers for " + offer.getOffer().getAgentId().getValue() + " for compaction."); decline(offer.getOffer().getId()); - removeAndDecline(sameSlave.get().getOffer().getId()); + decline(sameAgent.get().getOffer().getId()); } else { - hostOffers.add(offer); - executor.execute( - () -> removeAndDecline(offer.getOffer().getId()), - offerSettings.getOfferReturnDelay()); + offerDecline.defer(() -> removeAndDecline(offer.getOffer().getId())); } } @@ -326,7 +316,27 @@ public interface OfferManager extends EventSubscriber { return Optional.of(offer); } - synchronized void add(HostOffer offer) { + /** + * Adds an offer while maintaining a guarantee that no two offers may exist with the same + * agent ID. If an offer exists with the same agent ID, the existing offer is removed + * and returned, and {@code offer} is not added. + * + * @param offer Offer to add. + * @return The pre-existing offer with the same agent ID as {@code offer}, if one exists, + * which will also be removed prior to returning. + */ + synchronized Optional<HostOffer> addAndPreventAgentCollision(HostOffer offer) { + HostOffer sameAgent = offersBySlave.get(offer.getOffer().getAgentId()); + if (sameAgent != null) { + remove(sameAgent.getOffer().getId()); + return Optional.of(sameAgent); + } + + addInternal(offer); + return Optional.absent(); + } + + private void addInternal(HostOffer offer) { offers.add(offer); offersById.put(offer.getOffer().getId(), offer); offersBySlave.put(offer.getOffer().getAgentId(), offer); @@ -350,7 +360,7 @@ public interface OfferManager extends EventSubscriber { if (offer != null) { // Remove and re-add a host's offer to re-sort based on its new hostStatus remove(offer.getOffer().getId()); - add(new HostOffer(offer.getOffer(), attributes)); + addInternal(new HostOffer(offer.getOffer(), attributes)); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/a9353896/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java index 4c6fd54..e060f20 100644 --- a/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java +++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java @@ -15,7 +15,6 @@ package org.apache.aurora.scheduler.offers; import java.util.List; -import com.google.common.base.Supplier; import com.google.common.collect.Ordering; import org.apache.aurora.common.quantity.Amount; @@ -30,24 +29,14 @@ import static java.util.Objects.requireNonNull; public class OfferSettings { private final Amount<Long, Time> offerFilterDuration; - private final Supplier<Amount<Long, Time>> returnDelaySupplier; private final Ordering<HostOffer> offerOrder; - public OfferSettings( - Amount<Long, Time> offerFilterDuration, - Supplier<Amount<Long, Time>> returnDelaySupplier, - List<OfferOrder> offerOrder) { - - this(offerFilterDuration, returnDelaySupplier, OfferOrderBuilder.create(offerOrder)); + public OfferSettings(Amount<Long, Time> offerFilterDuration, List<OfferOrder> offerOrder) { + this(offerFilterDuration, OfferOrderBuilder.create(offerOrder)); } - OfferSettings( - Amount<Long, Time> offerFilterDuration, - Supplier<Amount<Long, Time>> returnDelaySupplier, - Ordering<HostOffer> offerOrder) { - + OfferSettings(Amount<Long, Time> offerFilterDuration, Ordering<HostOffer> offerOrder) { this.offerFilterDuration = requireNonNull(offerFilterDuration); - this.returnDelaySupplier = requireNonNull(returnDelaySupplier); this.offerOrder = requireNonNull(offerOrder); } @@ -59,14 +48,6 @@ public class OfferSettings { } /** - * The amount of time after which an unused offer should be 'returned' to Mesos by declining it. - * The delay is calculated for each offer using a random duration within a fixed window. - */ - public Amount<Long, Time> getOfferReturnDelay() { - return returnDelaySupplier.get(); - } - - /** * The ordering to use when fetching offers from OfferManager. */ public Ordering<HostOffer> getOfferOrder() { http://git-wip-us.apache.org/repos/asf/aurora/blob/a9353896/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java b/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java index ab98add..e6b2c55 100644 --- a/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java +++ b/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java @@ -22,6 +22,7 @@ import javax.inject.Singleton; import com.beust.jcommander.Parameter; import com.beust.jcommander.Parameters; +import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.common.collect.Ordering; import com.google.inject.AbstractModule; @@ -55,6 +56,12 @@ public class OffersModule extends AbstractModule { @Parameters(separators = "=") public static class Options { + @Parameter(names = "-hold_offers_forever", + description = + "Hold resource offers indefinitely, disabling automatic offer decline settings.", + arity = 1) + public boolean holdOffersForever = false; + @Parameter(names = "-min_offer_hold_time", validateValueWith = NotNegativeAmount.class, description = "Minimum amount of time to hold a resource offer before declining.") @@ -122,15 +129,17 @@ public class OffersModule extends AbstractModule { @Override protected void configure() { Options options = cliOptions.offer; - long offerHoldTime = - options.offerHoldJitterWindow.as(Time.SECONDS) + options.minOfferHoldTime.as(Time.SECONDS); - if (options.unavailabilityThreshold.as(Time.SECONDS) < offerHoldTime) { - LOG.warn("unavailability_threshold ({}) is less than the sum of min_offer_hold_time ({}) and" - + "offer_hold_jitter_window ({}). This creates risks of races between launching and" - + "draining", - options.unavailabilityThreshold, - options.minOfferHoldTime, - options.offerHoldJitterWindow); + if (!options.holdOffersForever) { + long offerHoldTime = options.offerHoldJitterWindow.as(Time.SECONDS) + + options.minOfferHoldTime.as(Time.SECONDS); + if (options.unavailabilityThreshold.as(Time.SECONDS) < offerHoldTime) { + LOG.warn("unavailability_threshold ({}) is less than the sum of min_offer_hold_time ({})" + + " and offer_hold_jitter_window ({}). This creates risks of races between " + + "launching and draining", + options.unavailabilityThreshold, + options.minOfferHoldTime, + options.offerHoldJitterWindow); + } } for (Module module: MoreModules.instantiateAll(options.offerOrderModules, cliOptions)) { @@ -144,6 +153,17 @@ public class OffersModule extends AbstractModule { install(new PrivateModule() { @Override protected void configure() { + if (options.holdOffersForever) { + bind(Deferment.class).to(Deferment.Noop.class); + } else { + bind(new TypeLiteral<Supplier<Amount<Long, Time>>>() { }).toInstance( + new RandomJitterReturnDelay( + options.minOfferHoldTime.as(Time.MILLISECONDS), + options.offerHoldJitterWindow.as(Time.MILLISECONDS), + Random.Util.newDefaultRandom())); + bind(Deferment.class).to(Deferment.DelayedDeferment.class); + } + bind(OfferManager.class).to(OfferManager.OfferManagerImpl.class); bind(OfferManager.OfferManagerImpl.class).in(Singleton.class); expose(OfferManager.class); @@ -155,12 +175,6 @@ public class OffersModule extends AbstractModule { @Provides @Singleton OfferSettings provideOfferSettings(Ordering<HostOffer> offerOrdering) { - return new OfferSettings( - cliOptions.offer.offerFilterDuration, - new RandomJitterReturnDelay( - cliOptions.offer.minOfferHoldTime.as(Time.MILLISECONDS), - cliOptions.offer.offerHoldJitterWindow.as(Time.MILLISECONDS), - Random.Util.newDefaultRandom()), - offerOrdering); + return new OfferSettings(cliOptions.offer.offerFilterDuration, offerOrdering); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/a9353896/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java index 8f4f63c..5b50244 100644 --- a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java +++ b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java @@ -117,6 +117,7 @@ public class CommandLineTest { expected.reconciliation.reconciliationScheduleSpread = TEST_TIME; expected.reconciliation.reconciliationBatchSize = 42; expected.reconciliation.reconciliationBatchInterval = TEST_TIME; + expected.offer.holdOffersForever = true; expected.offer.minOfferHoldTime = TEST_TIME; expected.offer.offerHoldJitterWindow = TEST_TIME; expected.offer.offerFilterDuration = TEST_TIME; @@ -269,6 +270,7 @@ public class CommandLineTest { "-reconciliation_schedule_spread=42days", "-reconciliation_explicit_batch_size=42", "-reconciliation_explicit_batch_interval=42days", + "-hold_offers_forever=true", "-min_offer_hold_time=42days", "-offer_hold_jitter_window=42days", "-offer_filter_duration=42days", http://git-wip-us.apache.org/repos/asf/aurora/blob/a9353896/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java index 815a7e8..6c8434e 100644 --- a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java @@ -32,6 +32,7 @@ import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected; import org.apache.aurora.scheduler.events.PubsubEvent.HostAttributesChanged; import org.apache.aurora.scheduler.mesos.Driver; +import org.apache.aurora.scheduler.offers.Deferment.Noop; import org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl; import org.apache.aurora.scheduler.resources.ResourceType; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; @@ -101,42 +102,31 @@ public class OfferManagerImplTest extends EasyMockTest { .setLaunch(Operation.Launch.newBuilder().addTaskInfos(TASK_INFO)) .build(); private static final List<Operation> OPERATIONS = ImmutableList.of(launch); - private static final long OFFER_FILTER_SECONDS = 0L; + private static final long OFFER_FILTER_SECONDS = 0; private static final Filters OFFER_FILTER = Filters.newBuilder() .setRefuseSeconds(OFFER_FILTER_SECONDS) .build(); private Driver driver; - private FakeScheduledExecutor clock; private OfferManagerImpl offerManager; private FakeStatsProvider statsProvider; @Before public void setUp() { driver = createMock(Driver.class); - DelayExecutor executorMock = createMock(DelayExecutor.class); - clock = FakeScheduledExecutor.fromDelayExecutor(executorMock); - addTearDown(clock::assertEmpty); OfferSettings offerSettings = new OfferSettings( Amount.of(OFFER_FILTER_SECONDS, Time.SECONDS), - () -> RETURN_DELAY, ImmutableList.of(OfferOrder.RANDOM)); statsProvider = new FakeStatsProvider(); - offerManager = new OfferManagerImpl(driver, offerSettings, statsProvider, executorMock); + offerManager = new OfferManagerImpl(driver, offerSettings, statsProvider, new Noop()); } @Test - public void testOffersSortedByUnavailability() throws Exception { - clock.advance(Amount.of(1L, Time.HOURS)); - - HostOffer hostOfferB = setUnavailability(OFFER_B, clock.nowMillis()); - Long offerCStartTime = clock.nowMillis() + ONE_HOUR.as(Time.MILLISECONDS); + public void testOffersSortedByUnavailability() { + HostOffer hostOfferB = setUnavailability(OFFER_B, 1); + long offerCStartTime = ONE_HOUR.as(Time.MILLISECONDS); HostOffer hostOfferC = setUnavailability(OFFER_C, offerCStartTime); - driver.declineOffer(OFFER_B.getOffer().getId(), OFFER_FILTER); - driver.declineOffer(OFFER_A.getOffer().getId(), OFFER_FILTER); - driver.declineOffer(OFFER_C.getOffer().getId(), OFFER_FILTER); - control.replay(); offerManager.addOffer(hostOfferB); @@ -149,8 +139,6 @@ public class OfferManagerImplTest extends EasyMockTest { // hostOfferC has a further away start time, so it should be preferred. ImmutableList.of(OFFER_A, hostOfferC, hostOfferB), actual); - - clock.advance(RETURN_DELAY); } @Test @@ -163,35 +151,23 @@ public class OfferManagerImplTest extends EasyMockTest { driver.acceptOffers(OFFER_B.getOffer().getId(), OPERATIONS, OFFER_FILTER); expectLastCall(); - driver.declineOffer(OFFER_A_ID, OFFER_FILTER); - expectLastCall(); - driver.declineOffer(offerC.getOffer().getId(), OFFER_FILTER); - expectLastCall(); - control.replay(); offerManager.addOffer(offerA); - assertEquals(1L, statsProvider.getLongValue(OUTSTANDING_OFFERS)); + assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS)); offerManager.addOffer(OFFER_B); - assertEquals(2L, statsProvider.getLongValue(OUTSTANDING_OFFERS)); + assertEquals(2, statsProvider.getLongValue(OUTSTANDING_OFFERS)); offerManager.addOffer(offerC); - assertEquals(3L, statsProvider.getLongValue(OUTSTANDING_OFFERS)); + assertEquals(3, statsProvider.getLongValue(OUTSTANDING_OFFERS)); assertEquals( ImmutableSet.of(OFFER_B, offerA, offerC), ImmutableSet.copyOf(offerManager.getOffers())); offerManager.launchTask(OFFER_B.getOffer().getId(), TASK_INFO); - assertEquals(2L, statsProvider.getLongValue(OUTSTANDING_OFFERS)); - clock.advance(RETURN_DELAY); - assertEquals(0L, statsProvider.getLongValue(OUTSTANDING_OFFERS)); + assertEquals(2, statsProvider.getLongValue(OUTSTANDING_OFFERS)); } @Test - public void hostAttributeChangeUpdatesOfferSorting() throws Exception { - driver.declineOffer(OFFER_A_ID, OFFER_FILTER); - expectLastCall(); - driver.declineOffer(OFFER_B.getOffer().getId(), OFFER_FILTER); - expectLastCall(); - + public void hostAttributeChangeUpdatesOfferSorting() { control.replay(); offerManager.hostAttributesChanged(new HostAttributesChanged(HOST_ATTRIBUTES_A)); @@ -209,8 +185,6 @@ public class OfferManagerImplTest extends EasyMockTest { offerManager.hostAttributesChanged(new HostAttributesChanged(offerA.getAttributes())); offerManager.hostAttributesChanged(new HostAttributesChanged(offerB.getAttributes())); assertEquals(ImmutableSet.of(OFFER_A, OFFER_B), ImmutableSet.copyOf(offerManager.getOffers())); - - clock.advance(RETURN_DELAY); } @Test @@ -221,108 +195,84 @@ public class OfferManagerImplTest extends EasyMockTest { control.replay(); offerManager.addOffer(OFFER_A); - assertEquals(1L, statsProvider.getLongValue(OUTSTANDING_OFFERS)); + assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS)); offerManager.addOffer(OFFER_A); - assertEquals(0L, statsProvider.getLongValue(OUTSTANDING_OFFERS)); - - clock.advance(RETURN_DELAY); + assertEquals(0, statsProvider.getLongValue(OUTSTANDING_OFFERS)); } @Test - public void testGetOffersReturnsAllOffers() throws Exception { + public void testGetOffersReturnsAllOffers() { control.replay(); offerManager.addOffer(OFFER_A); assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers())); - assertEquals(1L, statsProvider.getLongValue(OUTSTANDING_OFFERS)); + assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS)); offerManager.cancelOffer(OFFER_A_ID); - assertEquals(0L, statsProvider.getLongValue(OFFER_CANCEL_FAILURES)); + assertEquals(0, statsProvider.getLongValue(OFFER_CANCEL_FAILURES)); assertTrue(Iterables.isEmpty(offerManager.getOffers())); - assertEquals(0L, statsProvider.getLongValue(OUTSTANDING_OFFERS)); - - clock.advance(RETURN_DELAY); + assertEquals(0, statsProvider.getLongValue(OUTSTANDING_OFFERS)); } @Test - public void testOfferFilteringDueToStaticBan() throws Exception { - driver.declineOffer(OFFER_A_ID, OFFER_FILTER); - expectLastCall(); - + public void testOfferFilteringDueToStaticBan() { control.replay(); // Static ban ignored when now offers. offerManager.banOfferForTaskGroup(OFFER_A_ID, GROUP_KEY); - assertEquals(0L, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS)); + assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS)); offerManager.addOffer(OFFER_A); assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers(GROUP_KEY))); assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers())); // Add static ban. offerManager.banOfferForTaskGroup(OFFER_A_ID, GROUP_KEY); - assertEquals(1L, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS)); + assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS)); assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers())); assertTrue(Iterables.isEmpty(offerManager.getOffers(GROUP_KEY))); - - clock.advance(RETURN_DELAY); - assertEquals(0L, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS)); } @Test - public void testStaticBanIsClearedOnOfferReturn() throws Exception { - driver.declineOffer(OFFER_A_ID, OFFER_FILTER); - expectLastCall().times(2); - + public void testStaticBanIsClearedOnOfferReturn() { control.replay(); offerManager.addOffer(OFFER_A); offerManager.banOfferForTaskGroup(OFFER_A_ID, GROUP_KEY); assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers())); assertTrue(Iterables.isEmpty(offerManager.getOffers(GROUP_KEY))); - assertEquals(1L, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS)); + assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS)); // Make sure the static ban is cleared when the offers are returned. - clock.advance(RETURN_DELAY); + offerManager.cancelOffer(OFFER_A_ID); offerManager.addOffer(OFFER_A); assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers(GROUP_KEY))); - assertEquals(0L, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS)); - - clock.advance(RETURN_DELAY); + assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS)); } @Test - public void testStaticBanIsClearedOnDriverDisconnect() throws Exception { - driver.declineOffer(OFFER_A_ID, OFFER_FILTER); - expectLastCall(); - + public void testStaticBanIsClearedOnDriverDisconnect() { control.replay(); offerManager.addOffer(OFFER_A); offerManager.banOfferForTaskGroup(OFFER_A_ID, GROUP_KEY); assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers())); assertTrue(Iterables.isEmpty(offerManager.getOffers(GROUP_KEY))); - assertEquals(1L, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS)); + assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS)); // Make sure the static ban is cleared when driver is disconnected. offerManager.driverDisconnected(new DriverDisconnected()); - assertEquals(0L, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS)); + assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS)); offerManager.addOffer(OFFER_A); assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers(GROUP_KEY))); - - clock.advance(RETURN_DELAY); } @Test - public void getOffer() { - driver.declineOffer(OFFER_A_ID, OFFER_FILTER); - expectLastCall(); - + public void testGetOffer() { control.replay(); offerManager.addOffer(OFFER_A); assertEquals(Optional.of(OFFER_A), offerManager.getOffer(OFFER_A.getOffer().getAgentId())); - assertEquals(1L, statsProvider.getLongValue(OUTSTANDING_OFFERS)); - clock.advance(RETURN_DELAY); + assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS)); } @Test(expected = OfferManager.LaunchException.class) @@ -333,12 +283,7 @@ public class OfferManagerImplTest extends EasyMockTest { control.replay(); offerManager.addOffer(OFFER_A); - - try { - offerManager.launchTask(OFFER_A_ID, TASK_INFO); - } finally { - clock.advance(RETURN_DELAY); - } + offerManager.launchTask(OFFER_A_ID, TASK_INFO); } @Test @@ -348,66 +293,48 @@ public class OfferManagerImplTest extends EasyMockTest { offerManager.launchTask(OFFER_A_ID, TASK_INFO); fail("Method invocation is expected to throw exception."); } catch (OfferManager.LaunchException e) { - assertEquals(1L, statsProvider.getLongValue(OFFER_ACCEPT_RACES)); + assertEquals(1, statsProvider.getLongValue(OFFER_ACCEPT_RACES)); } } @Test - public void testFlushOffers() throws Exception { + public void testFlushOffers() { control.replay(); offerManager.addOffer(OFFER_A); offerManager.addOffer(OFFER_B); - assertEquals(2L, statsProvider.getLongValue(OUTSTANDING_OFFERS)); + assertEquals(2, statsProvider.getLongValue(OUTSTANDING_OFFERS)); offerManager.driverDisconnected(new DriverDisconnected()); - assertEquals(0L, statsProvider.getLongValue(OUTSTANDING_OFFERS)); - clock.advance(RETURN_DELAY); + assertEquals(0, statsProvider.getLongValue(OUTSTANDING_OFFERS)); } @Test - public void testCancelFailure() throws Exception { + public void testCancelFailure() { control.replay(); offerManager.cancelOffer(OFFER_A.getOffer().getId()); - assertEquals(1L, statsProvider.getLongValue(OFFER_CANCEL_FAILURES)); - } - - @Test - public void testDeclineOffer() throws Exception { - driver.declineOffer(OFFER_A.getOffer().getId(), OFFER_FILTER); - expectLastCall(); - - control.replay(); - - offerManager.addOffer(OFFER_A); - assertEquals(1L, statsProvider.getLongValue(OUTSTANDING_OFFERS)); - clock.advance(RETURN_DELAY); - assertEquals(0L, statsProvider.getLongValue(OUTSTANDING_OFFERS)); + assertEquals(1, statsProvider.getLongValue(OFFER_CANCEL_FAILURES)); } @Test - public void testBanAndUnbanOffer() throws Exception { - driver.declineOffer(OFFER_A.getOffer().getId(), OFFER_FILTER); - expectLastCall().times(2); + public void testBanAndUnbanOffer() { control.replay(); // After adding a banned offer, user can see it is in OUTSTANDING_OFFERS but cannot retrieve it. offerManager.banOffer(OFFER_A_ID); offerManager.addOffer(OFFER_A); - assertEquals(1L, statsProvider.getLongValue(OUTSTANDING_OFFERS)); - assertEquals(1L, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS)); + assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS)); + assertEquals(1, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS)); assertTrue(Iterables.isEmpty(offerManager.getOffers(GROUP_KEY))); - clock.advance(RETURN_DELAY); offerManager.cancelOffer(OFFER_A_ID); offerManager.addOffer(OFFER_A); - assertEquals(1L, statsProvider.getLongValue(OUTSTANDING_OFFERS)); - assertEquals(0L, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS)); + assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS)); + assertEquals(0, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS)); assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers(GROUP_KEY))); - clock.advance(RETURN_DELAY); } - private static HostOffer setUnavailability(HostOffer offer, Long startMs) { + private static HostOffer setUnavailability(HostOffer offer, long startMs) { Unavailability unavailability = Unavailability.newBuilder() .setStart(TimeInfo.newBuilder().setNanoseconds(startMs * 1000L)).build(); return new HostOffer( @@ -422,17 +349,13 @@ public class OfferManagerImplTest extends EasyMockTest { } private OfferManager createOrderedManager(List<OfferOrder> order) { - OfferSettings settings = new OfferSettings( - Amount.of(OFFER_FILTER_SECONDS, Time.SECONDS), - () -> RETURN_DELAY, - order); - DelayExecutor executorMock = createMock(DelayExecutor.class); - clock = FakeScheduledExecutor.fromDelayExecutor(executorMock); - return new OfferManagerImpl(driver, settings, statsProvider, executorMock); + OfferSettings settings = + new OfferSettings(Amount.of(OFFER_FILTER_SECONDS, Time.SECONDS), order); + return new OfferManagerImpl(driver, settings, statsProvider, new Noop()); } @Test - public void testCPUOrdering() throws Exception { + public void testCPUOrdering() { OfferManager cpuManager = createOrderedManager(ImmutableList.of(OfferOrder.CPU)); HostOffer small = setMode(new HostOffer( @@ -464,7 +387,7 @@ public class OfferManagerImplTest extends EasyMockTest { } @Test - public void testRevocableCPUOrdering() throws Exception { + public void testRevocableCPUOrdering() { ResourceType.initializeEmptyCliArgsForTest(); OfferManager cpuManager = createOrderedManager(ImmutableList.of(OfferOrder.REVOCABLE_CPU)); @@ -501,7 +424,7 @@ public class OfferManagerImplTest extends EasyMockTest { } @Test - public void testDiskOrdering() throws Exception { + public void testDiskOrdering() { OfferManager cpuManager = createOrderedManager(ImmutableList.of(OfferOrder.DISK)); HostOffer small = setMode(new HostOffer( @@ -529,7 +452,7 @@ public class OfferManagerImplTest extends EasyMockTest { } @Test - public void testMemoryOrdering() throws Exception { + public void testMemoryOrdering() { OfferManager cpuManager = createOrderedManager(ImmutableList.of(OfferOrder.MEMORY)); HostOffer small = setMode(new HostOffer( @@ -557,7 +480,7 @@ public class OfferManagerImplTest extends EasyMockTest { } @Test - public void testCPUMemoryOrdering() throws Exception { + public void testCPUMemoryOrdering() { OfferManager cpuManager = createOrderedManager( ImmutableList.of(OfferOrder.CPU, OfferOrder.MEMORY)); @@ -594,4 +517,32 @@ public class OfferManagerImplTest extends EasyMockTest { assertEquals(ImmutableList.of(small, medium, large), ImmutableList.copyOf(cpuManager.getOffers())); } + + @Test + public void testDelayedOfferReturn() { + OfferSettings settings = new OfferSettings( + Amount.of(OFFER_FILTER_SECONDS, Time.SECONDS), + ImmutableList.of(OfferOrder.RANDOM)); + DelayExecutor executorMock = createMock(DelayExecutor.class); + FakeScheduledExecutor clock = FakeScheduledExecutor.fromDelayExecutor(executorMock); + addTearDown(clock::assertEmpty); + offerManager = new OfferManagerImpl( + driver, + settings, + statsProvider, + new Deferment.DelayedDeferment(() -> RETURN_DELAY, executorMock)); + + driver.declineOffer(OFFER_A_ID, OFFER_FILTER); + + control.replay(); + + offerManager.addOffer(OFFER_A); + offerManager.banOfferForTaskGroup(OFFER_A_ID, GROUP_KEY); + assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS)); + assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS)); + + clock.advance(RETURN_DELAY); + assertEquals(0, statsProvider.getLongValue(OUTSTANDING_OFFERS)); + assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS)); + } }
