Repository: aurora Updated Branches: refs/heads/master 8adc9bd87 -> e8a61f68c
Fixing slave/task reservation check. Bugs closed: AURORA-1431 Reviewed at https://reviews.apache.org/r/37206/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/e8a61f68 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/e8a61f68 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/e8a61f68 Branch: refs/heads/master Commit: e8a61f68ce87e51629f85801b044c9a2cda7ba0c Parents: 8adc9bd Author: Maxim Khutornenko <[email protected]> Authored: Fri Aug 7 10:12:30 2015 -0700 Committer: Maxim Khutornenko <[email protected]> Committed: Fri Aug 7 10:12:30 2015 -0700 ---------------------------------------------------------------------- .../scheduler/filter/SchedulingFilterImpl.java | 3 + .../aurora/scheduler/preemptor/BiCache.java | 11 ++++ .../scheduler/scheduling/SchedulingModule.java | 2 +- .../scheduler/scheduling/TaskScheduler.java | 12 ++-- .../aurora/scheduler/state/TaskAssigner.java | 17 ++++-- .../aurora/scheduler/preemptor/BiCacheTest.java | 14 +++++ .../scheduling/TaskSchedulerImplTest.java | 57 +++++++++++------- .../scheduler/state/TaskAssignerImplTest.java | 63 ++++++++++++++++++-- 8 files changed, 137 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/e8a61f68/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java index 54ffd8e..08d7ac7 100644 --- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java @@ -23,6 +23,8 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Ordering; import com.google.inject.Inject; + +import com.twitter.common.inject.TimedInterceptor.Timed; import com.twitter.common.quantity.Amount; import com.twitter.common.quantity.Data; @@ -169,6 +171,7 @@ public class SchedulingFilterImpl implements SchedulingFilter { new ConstraintMatcher.NameFilter(DEDICATED_ATTRIBUTE)); } + @Timed("scheduling_filter") @Override public Set<Veto> filter(UnusedResource resource, ResourceRequest request) { // Apply veto filtering rules from higher to lower score making sure we cut over and return http://git-wip-us.apache.org/repos/asf/aurora/blob/e8a61f68/src/main/java/org/apache/aurora/scheduler/preemptor/BiCache.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/BiCache.java b/src/main/java/org/apache/aurora/scheduler/preemptor/BiCache.java index 2551057..98d18df 100644 --- a/src/main/java/org/apache/aurora/scheduler/preemptor/BiCache.java +++ b/src/main/java/org/apache/aurora/scheduler/preemptor/BiCache.java @@ -13,6 +13,7 @@ */ package org.apache.aurora.scheduler.preemptor; +import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -26,6 +27,7 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalNotification; import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Multimap; import com.twitter.common.quantity.Amount; @@ -136,4 +138,13 @@ public class BiCache<K, V> { inverse.remove(value, key); cache.invalidate(key); } + + /** + * Returns an immutable copy of entries stored in this cache. + * + * @return Immutable map of cache entries. + */ + public synchronized Map<K, V> asMap() { + return ImmutableMap.copyOf(cache.asMap()); + } } http://git-wip-us.apache.org/repos/asf/aurora/blob/e8a61f68/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java b/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java index b9dccc6..c7a1a46 100644 --- a/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java +++ b/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java @@ -112,7 +112,7 @@ public class SchedulingModule extends AbstractModule { install(new PrivateModule() { @Override protected void configure() { - bind(new TypeLiteral<BiCache<TaskGroupKey, String>>() { }).in(Singleton.class); + bind(new TypeLiteral<BiCache<String, TaskGroupKey>>() { }).in(Singleton.class); bind(BiCache.BiCacheSettings.class).toInstance( new BiCache.BiCacheSettings(RESERVATION_DURATION.get(), "reservation_cache_size")); bind(TaskScheduler.class).to(TaskScheduler.TaskSchedulerImpl.class); http://git-wip-us.apache.org/repos/asf/aurora/blob/e8a61f68/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java index 0f0bfca..7761745 100644 --- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java +++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java @@ -87,7 +87,7 @@ public interface TaskScheduler extends EventSubscriber { private final Storage storage; private final TaskAssigner assigner; private final Preemptor preemptor; - private final BiCache<TaskGroupKey, String> reservations; + private final BiCache<String, TaskGroupKey> reservations; private final AtomicLong attemptsFired = Stats.exportLong("schedule_attempts_fired"); private final AtomicLong attemptsFailed = Stats.exportLong("schedule_attempts_failed"); @@ -98,7 +98,7 @@ public interface TaskScheduler extends EventSubscriber { Storage storage, TaskAssigner assigner, Preemptor preemptor, - BiCache<TaskGroupKey, String> reservations) { + BiCache<String, TaskGroupKey> reservations) { this.storage = requireNonNull(storage); this.assigner = requireNonNull(assigner); @@ -146,7 +146,7 @@ public interface TaskScheduler extends EventSubscriber { new ResourceRequest(task, aggregate), TaskGroupKey.from(task), taskId, - reservations.get(TaskGroupKey.from(task))); + reservations.asMap()); if (!launched) { // Task could not be scheduled. @@ -167,12 +167,12 @@ public interface TaskScheduler extends EventSubscriber { AttributeAggregate jobState, MutableStoreProvider storeProvider) { - if (reservations.get(TaskGroupKey.from(task.getTask())).isPresent()) { + if (!reservations.getByValue(TaskGroupKey.from(task.getTask())).isEmpty()) { return; } Optional<String> slaveId = preemptor.attemptPreemptionFor(task, jobState, storeProvider); if (slaveId.isPresent()) { - reservations.put(TaskGroupKey.from(task.getTask()), slaveId.get()); + reservations.put(slaveId.get(), TaskGroupKey.from(task.getTask())); } } @@ -181,7 +181,7 @@ public interface TaskScheduler extends EventSubscriber { if (Optional.of(PENDING).equals(stateChangeEvent.getOldState())) { IAssignedTask assigned = stateChangeEvent.getTask().getAssignedTask(); if (assigned.getSlaveId() != null) { - reservations.remove(TaskGroupKey.from(assigned.getTask()), assigned.getSlaveId()); + reservations.remove(assigned.getSlaveId(), TaskGroupKey.from(assigned.getTask())); } } } http://git-wip-us.apache.org/repos/asf/aurora/blob/e8a61f68/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java index 0e32990..ae59efa 100644 --- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java +++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java @@ -27,6 +27,8 @@ import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.FluentIterable; + +import com.twitter.common.inject.TimedInterceptor.Timed; import com.twitter.common.stats.Stats; import org.apache.aurora.scheduler.HostOffer; @@ -62,7 +64,7 @@ public interface TaskAssigner { * @param resourceRequest The request for resources being scheduled. * @param groupKey Task group key. * @param taskId Task id to assign. - * @param slaveReservation Slave reservation for a given {@code groupKey}. + * @param slaveReservations Slave reservations. * @return Assignment result. */ boolean maybeAssign( @@ -70,7 +72,7 @@ public interface TaskAssigner { ResourceRequest resourceRequest, TaskGroupKey groupKey, String taskId, - Optional<String> slaveReservation); + Map<String, TaskGroupKey> slaveReservations); class TaskAssignerImpl implements TaskAssigner { private static final Logger LOG = Logger.getLogger(TaskAssignerImpl.class.getName()); @@ -129,18 +131,21 @@ public interface TaskAssigner { return taskFactory.createFrom(assigned, offer.getSlaveId()); } + @Timed("assigner_maybe_assign") @Override public boolean maybeAssign( MutableStoreProvider storeProvider, ResourceRequest resourceRequest, TaskGroupKey groupKey, String taskId, - Optional<String> slaveReservation) { + Map<String, TaskGroupKey> slaveReservations) { for (HostOffer offer : offerManager.getOffers(groupKey)) { - if (slaveReservation.isPresent() - && !slaveReservation.get().equals(offer.getOffer().getSlaveId().getValue())) { - // Task group has a slave reserved but this offer is for a different slave -> skip. + Optional<TaskGroupKey> reservedGroup = Optional.fromNullable( + slaveReservations.get(offer.getOffer().getSlaveId().getValue())); + + if (reservedGroup.isPresent() && !reservedGroup.get().equals(groupKey)) { + // This slave is reserved for a different task group -> skip. continue; } Set<Veto> vetoes = filter.filter( http://git-wip-us.apache.org/repos/asf/aurora/blob/e8a61f68/src/test/java/org/apache/aurora/scheduler/preemptor/BiCacheTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/BiCacheTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/BiCacheTest.java index 7312091..f15bcd3 100644 --- a/src/test/java/org/apache/aurora/scheduler/preemptor/BiCacheTest.java +++ b/src/test/java/org/apache/aurora/scheduler/preemptor/BiCacheTest.java @@ -13,7 +13,10 @@ */ package org.apache.aurora.scheduler.preemptor; +import java.util.Map; + import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.twitter.common.quantity.Amount; import com.twitter.common.quantity.Time; @@ -62,6 +65,7 @@ public class BiCacheTest { biCache.put(KEY_1, 1); assertEquals(1L, statsProvider.getLongValue(STAT_NAME)); assertEquals(Optional.of(1), biCache.get(KEY_1)); + assertEquals(NO_VALUE, biCache.get(KEY_2)); biCache.remove(KEY_1, 1); assertEquals(NO_VALUE, biCache.get(KEY_1)); assertEquals(0L, statsProvider.getLongValue(STAT_NAME)); @@ -104,4 +108,14 @@ public class BiCacheTest { assertEquals(ImmutableSet.of(KEY_1), biCache.getByValue(2)); assertEquals(1L, statsProvider.getLongValue(STAT_NAME)); } + + @Test + public void testAsMap() { + biCache.put(KEY_1, 1); + assertEquals(Optional.of(1), biCache.get(KEY_1)); + Map<String, Integer> map = biCache.asMap(); + + biCache.put(KEY_1, 2); + assertEquals(ImmutableMap.of(KEY_1, 1), map); + } } http://git-wip-us.apache.org/repos/asf/aurora/blob/e8a61f68/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java index 350ec6f..492334b 100644 --- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java @@ -13,8 +13,11 @@ */ package org.apache.aurora.scheduler.scheduling; +import java.util.Map; + import com.google.common.base.Function; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.inject.AbstractModule; import com.google.inject.Guice; @@ -65,14 +68,16 @@ public class TaskSchedulerImplTest extends EasyMockTest { private static final IScheduledTask TASK_A = TaskTestUtil.makeTask("a", JobKeys.from("a", "a", "a")); + private static final TaskGroupKey GROUP_KEY = + TaskGroupKey.from(TASK_A.getAssignedTask().getTask()); private static final String SLAVE_ID = "HOST_A"; - private static final Optional<String> NO_RESERVATION = Optional.absent(); + private static final Map<String, TaskGroupKey> NO_RESERVATION = ImmutableMap.of(); private StorageTestUtil storageUtil; private TaskAssigner assigner; private TaskScheduler scheduler; private Preemptor preemptor; - private BiCache<TaskGroupKey, String> reservations; + private BiCache<String, TaskGroupKey> reservations; private EventSink eventSink; @Before @@ -80,7 +85,7 @@ public class TaskSchedulerImplTest extends EasyMockTest { storageUtil = new StorageTestUtil(this); assigner = createMock(TaskAssigner.class); preemptor = createMock(Preemptor.class); - reservations = createMock(new Clazz<BiCache<TaskGroupKey, String>>() { }); + reservations = createMock(new Clazz<BiCache<String, TaskGroupKey>>() { }); Injector injector = getInjector(storageUtil.storage); scheduler = injector.getInstance(TaskScheduler.class); @@ -93,7 +98,7 @@ public class TaskSchedulerImplTest extends EasyMockTest { new AbstractModule() { @Override protected void configure() { - bind(new TypeLiteral<BiCache<TaskGroupKey, String>>() { }).toInstance(reservations); + bind(new TypeLiteral<BiCache<String, TaskGroupKey>>() { }).toInstance(reservations); bind(TaskScheduler.class).to(TaskSchedulerImpl.class); bind(Preemptor.class).toInstance(preemptor); bind(TaskAssigner.class).toInstance(assigner); @@ -113,21 +118,21 @@ public class TaskSchedulerImplTest extends EasyMockTest { private IExpectationSetters<Boolean> expectAssigned( IScheduledTask task, - Optional<String> reservation) { + Map<String, TaskGroupKey> reservationMap) { return expect(assigner.maybeAssign( storageUtil.mutableStoreProvider, new ResourceRequest(task.getAssignedTask().getTask(), EMPTY), TaskGroupKey.from(task.getAssignedTask().getTask()), Tasks.id(task), - reservation)); + reservationMap)); } @Test public void testSchedule() throws Exception { storageUtil.expectOperations(); - expectReservationCheck(TASK_A); + expectAsMap(NO_RESERVATION); expectTaskStillPendingQuery(TASK_A); expectActiveJobFetch(TASK_A); expectAssigned(TASK_A, NO_RESERVATION).andReturn(true); @@ -157,22 +162,24 @@ public class TaskSchedulerImplTest extends EasyMockTest { expectTaskStillPendingQuery(TASK_A); expectActiveJobFetch(TASK_A); expectAssigned(TASK_A, NO_RESERVATION).andReturn(false); - expectReservationCheck(TASK_A).times(2); - expectPreemptorCall(TASK_A, NO_RESERVATION); + expectAsMap(NO_RESERVATION); + expectNoReservation(TASK_A); + expectPreemptorCall(TASK_A, Optional.<String>absent()); // Slave is reserved. expectTaskStillPendingQuery(TASK_A); expectActiveJobFetch(TASK_A); expectAssigned(TASK_A, NO_RESERVATION).andReturn(false); - expectReservationCheck(TASK_A).times(2); + expectAsMap(NO_RESERVATION); + expectNoReservation(TASK_A); expectPreemptorCall(TASK_A, Optional.of(SLAVE_ID)); expectAddReservation(TASK_A, SLAVE_ID); // Use previously created reservation. expectTaskStillPendingQuery(TASK_A); expectActiveJobFetch(TASK_A); - expectGetReservation(TASK_A, SLAVE_ID); - expectAssigned(TASK_A, Optional.of(SLAVE_ID)).andReturn(true); + expectAsMap(ImmutableMap.of(SLAVE_ID, GROUP_KEY)); + expectAssigned(TASK_A, ImmutableMap.of(SLAVE_ID, GROUP_KEY)).andReturn(true); control.replay(); @@ -187,7 +194,7 @@ public class TaskSchedulerImplTest extends EasyMockTest { expectTaskStillPendingQuery(TASK_A); expectActiveJobFetch(TASK_A); - expectReservationCheck(TASK_A); + expectAsMap(NO_RESERVATION); expectAssigned(TASK_A, NO_RESERVATION).andReturn(false); expectGetReservation(TASK_A, SLAVE_ID); @@ -202,7 +209,7 @@ public class TaskSchedulerImplTest extends EasyMockTest { expectTaskStillPendingQuery(TASK_A); expectActiveJobFetch(TASK_A); - expectReservationCheck(TASK_A); + expectAsMap(NO_RESERVATION); expectAssigned(TASK_A, NO_RESERVATION).andReturn(false); expectGetReservation(TASK_A, SLAVE_ID); @@ -220,7 +227,7 @@ public class TaskSchedulerImplTest extends EasyMockTest { @Test public void testPendingDeletedHandled() throws Exception { - reservations.remove(TaskGroupKey.from(TASK_A.getAssignedTask().getTask()), SLAVE_ID); + reservations.remove(SLAVE_ID, TaskGroupKey.from(TASK_A.getAssignedTask().getTask())); control.replay(); @@ -250,7 +257,7 @@ public class TaskSchedulerImplTest extends EasyMockTest { } }); - expectReservationCheck(TASK_A); + expectAsMap(NO_RESERVATION); expect(assigner.maybeAssign( EasyMock.anyObject(), eq(new ResourceRequest(taskA.getAssignedTask().getTask(), EMPTY)), @@ -267,7 +274,7 @@ public class TaskSchedulerImplTest extends EasyMockTest { public void testScheduleThrows() throws Exception { storageUtil.expectOperations(); - expectReservationCheck(TASK_A); + expectAsMap(NO_RESERVATION); expectTaskStillPendingQuery(TASK_A); expectActiveJobFetch(TASK_A); expectAssigned(TASK_A, NO_RESERVATION).andThrow(new IllegalArgumentException("expected")); @@ -292,16 +299,20 @@ public class TaskSchedulerImplTest extends EasyMockTest { } private void expectAddReservation(IScheduledTask task, String slaveId) { - reservations.put(TaskGroupKey.from(task.getAssignedTask().getTask()), slaveId); + reservations.put(slaveId, TaskGroupKey.from(task.getAssignedTask().getTask())); } private IExpectationSetters<?> expectGetReservation(IScheduledTask task, String slaveId) { - return expect(reservations.get(TaskGroupKey.from(task.getAssignedTask().getTask()))) - .andReturn(Optional.of(slaveId)); + return expect(reservations.getByValue(TaskGroupKey.from(task.getAssignedTask().getTask()))) + .andReturn(ImmutableSet.of(slaveId)); + } + + private IExpectationSetters<?> expectNoReservation(IScheduledTask task) { + return expect(reservations.getByValue(TaskGroupKey.from(task.getAssignedTask().getTask()))) + .andReturn(ImmutableSet.of()); } - private IExpectationSetters<?> expectReservationCheck(IScheduledTask task) { - return expect(reservations.get(TaskGroupKey.from(task.getAssignedTask().getTask()))) - .andReturn(Optional.<String>absent()); + private IExpectationSetters<?> expectAsMap(Map<String, TaskGroupKey> map) { + return expect(reservations.asMap()).andReturn(map); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/e8a61f68/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java index c9c6f5d..1de1d1f 100644 --- a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java @@ -13,6 +13,8 @@ */ package org.apache.aurora.scheduler.state; +import java.util.Map; + import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -37,6 +39,7 @@ import org.apache.aurora.scheduler.offers.OfferManager; import org.apache.aurora.scheduler.state.TaskAssigner.TaskAssignerImpl; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.aurora.scheduler.storage.entities.ITaskConfig; import org.apache.mesos.Protos.FrameworkID; import org.apache.mesos.Protos.OfferID; import org.apache.mesos.Protos.Resource; @@ -63,10 +66,11 @@ import static org.junit.Assert.assertTrue; public class TaskAssignerImplTest extends EasyMockTest { private static final int PORT = 5000; + private static final String SLAVE_ID = "slaveId"; private static final Offer MESOS_OFFER = Offer.newBuilder() .setId(OfferID.newBuilder().setValue("offerId")) .setFrameworkId(FrameworkID.newBuilder().setValue("frameworkId")) - .setSlaveId(SlaveID.newBuilder().setValue("slaveId")) + .setSlaveId(SlaveID.newBuilder().setValue(SLAVE_ID)) .setHostname("hostName") .addResources(Resource.newBuilder() .setName("ports") @@ -91,6 +95,7 @@ public class TaskAssignerImplTest extends EasyMockTest { .setTaskId(TaskID.newBuilder().setValue(Tasks.id(TASK))) .setSlaveId(MESOS_OFFER.getSlaveId()) .build(); + private static final Map<String, TaskGroupKey> NO_RESERVATION = ImmutableMap.of(); private MutableStoreProvider storeProvider; private StateManager stateManager; @@ -134,7 +139,7 @@ public class TaskAssignerImplTest extends EasyMockTest { new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY), TaskGroupKey.from(TASK.getAssignedTask().getTask()), Tasks.id(TASK), - Optional.of(MESOS_OFFER.getSlaveId().getValue()))); + ImmutableMap.of(SLAVE_ID, GROUP_KEY))); } @Test @@ -153,7 +158,7 @@ public class TaskAssignerImplTest extends EasyMockTest { new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY), TaskGroupKey.from(TASK.getAssignedTask().getTask()), Tasks.id(TASK), - Optional.<String>absent())); + NO_RESERVATION)); } @Test @@ -171,7 +176,7 @@ public class TaskAssignerImplTest extends EasyMockTest { new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY), TaskGroupKey.from(TASK.getAssignedTask().getTask()), Tasks.id(TASK), - Optional.<String>absent())); + NO_RESERVATION)); } @Test @@ -207,7 +212,7 @@ public class TaskAssignerImplTest extends EasyMockTest { new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY), TaskGroupKey.from(TASK.getAssignedTask().getTask()), Tasks.id(TASK), - Optional.<String>absent())); + NO_RESERVATION)); } @Test @@ -221,6 +226,52 @@ public class TaskAssignerImplTest extends EasyMockTest { new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY), TaskGroupKey.from(TASK.getAssignedTask().getTask()), Tasks.id(TASK), - Optional.of("invalid"))); + ImmutableMap.of(SLAVE_ID, TaskGroupKey.from( + ITaskConfig.build(new TaskConfig().setJob(new JobKey("other", "e", "n"))))))); + } + + @Test + public void testTaskWithReservedSlaveLandsElsewhere() throws Exception { + // Ensures slave/task reservation relationship is only enforced in slave->task direction + // and permissive in task->slave direction. In other words, a task with a slave reservation + // should still be tried against other unreserved slaves. + HostOffer offer = new HostOffer( + Offer.newBuilder() + .setId(OfferID.newBuilder().setValue("offerId0")) + .setFrameworkId(FrameworkID.newBuilder().setValue("frameworkId")) + .setSlaveId(SlaveID.newBuilder().setValue("slaveId0")) + .setHostname("hostName0") + .addResources(Resource.newBuilder() + .setName("ports") + .setType(Type.RANGES) + .setRanges( + Ranges.newBuilder().addRange(Range.newBuilder().setBegin(PORT).setEnd(PORT)))) + .build(), + IHostAttributes.build(new HostAttributes())); + + expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(offer, OFFER)); + expect(filter.filter( + new UnusedResource(ResourceSlot.from(offer.getOffer()), offer.getAttributes()), + new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY))) + .andReturn(ImmutableSet.of()); + expect(stateManager.assignTask( + storeProvider, + Tasks.id(TASK), + offer.getOffer().getHostname(), + offer.getOffer().getSlaveId(), + ImmutableMap.of(PORT_NAME, PORT))) + .andReturn(TASK.getAssignedTask()); + expect(taskFactory.createFrom(TASK.getAssignedTask(), offer.getOffer().getSlaveId())) + .andReturn(TASK_INFO); + offerManager.launchTask(offer.getOffer().getId(), TASK_INFO); + + control.replay(); + + assertTrue(assigner.maybeAssign( + storeProvider, + new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY), + TaskGroupKey.from(TASK.getAssignedTask().getTask()), + Tasks.id(TASK), + ImmutableMap.of(SLAVE_ID, GROUP_KEY))); } }
