Repository: aurora Updated Branches: refs/heads/master 74a121772 -> f4446a612
Updating preemptor to account for revocable offers/tasks Bugs closed: AURORA-1418 Reviewed at https://reviews.apache.org/r/37624/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/f4446a61 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/f4446a61 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/f4446a61 Branch: refs/heads/master Commit: f4446a61220344c029af669ed0351e086d0a198d Parents: 74a1217 Author: Maxim Khutornenko <[email protected]> Authored: Thu Aug 20 15:21:53 2015 -0700 Committer: Maxim Khutornenko <[email protected]> Committed: Thu Aug 20 15:21:53 2015 -0700 ---------------------------------------------------------------------- .../scheduler/preemptor/PreemptionVictim.java | 68 ++++------- .../preemptor/PreemptionVictimFilter.java | 16 ++- .../preemptor/PreemptionVictimFilterTest.java | 117 +++++++++++++++++-- 3 files changed, 143 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/f4446a61/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java index 8162323..8f3161a 100644 --- a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java +++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java @@ -19,66 +19,48 @@ import org.apache.aurora.scheduler.ResourceSlot; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; +import static java.util.Objects.requireNonNull; + /** * A victim to be considered as a candidate for preemption. */ public final class PreemptionVictim { - private final String slaveHost; - private final boolean production; - private final String role; - private final int priority; - private final ResourceSlot resourceSlot; - private final String taskId; - - private PreemptionVictim( - String slaveHost, - boolean production, - String role, - int priority, - ResourceSlot resourceSlot, - String taskId) { - - this.slaveHost = slaveHost; - this.production = production; - this.role = role; - this.priority = priority; - this.resourceSlot = resourceSlot; - this.taskId = taskId; + private final IAssignedTask task; + + private PreemptionVictim(IAssignedTask task) { + this.task = requireNonNull(task); } public static PreemptionVictim fromTask(IAssignedTask task) { - ITaskConfig config = task.getTask(); - return new PreemptionVictim( - task.getSlaveHost(), - config.isProduction(), - config.getJob().getRole(), - config.getPriority(), - ResourceSlot.from(task.getTask()), - task.getTaskId()); + return new PreemptionVictim(task); } public String getSlaveHost() { - return slaveHost; + return task.getSlaveHost(); } public boolean isProduction() { - return production; + return task.getTask().isProduction(); } public String getRole() { - return role; + return task.getTask().getJob().getRole(); } public int getPriority() { - return priority; + return task.getTask().getPriority(); } public ResourceSlot getResourceSlot() { - return resourceSlot; + return ResourceSlot.from(task.getTask()); } public String getTaskId() { - return taskId; + return task.getTaskId(); + } + + public ITaskConfig getConfig() { + return task.getTask(); } @Override @@ -88,28 +70,18 @@ public final class PreemptionVictim { } PreemptionVictim other = (PreemptionVictim) o; - return Objects.equals(getSlaveHost(), other.getSlaveHost()) - && Objects.equals(isProduction(), other.isProduction()) - && Objects.equals(getRole(), other.getRole()) - && Objects.equals(getPriority(), other.getPriority()) - && Objects.equals(getResourceSlot(), other.getResourceSlot()) - && Objects.equals(getTaskId(), other.getTaskId()); + return Objects.equals(task, other.task); } @Override public int hashCode() { - return Objects.hash(slaveHost, production, role, priority, resourceSlot, taskId); + return Objects.hashCode(task); } @Override public String toString() { return com.google.common.base.Objects.toStringHelper(this) - .add("slaveHost", getSlaveHost()) - .add("production", isProduction()) - .add("role", getRole()) - .add("priority", getPriority()) - .add("resourceSlot", getResourceSlot()) - .add("taskId", getTaskId()) + .add("task", task) .toString(); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/f4446a61/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java index a0e71e1..67d7f07 100644 --- a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java +++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java @@ -29,6 +29,7 @@ import com.google.common.collect.Sets; import org.apache.aurora.scheduler.HostOffer; import org.apache.aurora.scheduler.ResourceSlot; import org.apache.aurora.scheduler.Resources; +import org.apache.aurora.scheduler.TierManager; import org.apache.aurora.scheduler.filter.AttributeAggregate; import org.apache.aurora.scheduler.filter.SchedulingFilter; import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; @@ -80,23 +81,26 @@ public interface PreemptionVictimFilter { private final SchedulingFilter schedulingFilter; private final ExecutorSettings executorSettings; private final PreemptorMetrics metrics; + private final TierManager tierManager; @Inject PreemptionVictimFilterImpl( SchedulingFilter schedulingFilter, ExecutorSettings executorSettings, - PreemptorMetrics metrics) { + PreemptorMetrics metrics, + TierManager tierManager) { this.schedulingFilter = requireNonNull(schedulingFilter); this.executorSettings = requireNonNull(executorSettings); this.metrics = requireNonNull(metrics); + this.tierManager = requireNonNull(tierManager); } private static final Function<HostOffer, ResourceSlot> OFFER_TO_RESOURCE_SLOT = new Function<HostOffer, ResourceSlot>() { @Override public ResourceSlot apply(HostOffer offer) { - return Resources.from(offer.getOffer()).slot(); + return Resources.from(offer.getOffer()).filter(Resources.NON_REVOCABLE).slot(); } }; @@ -120,7 +124,13 @@ public interface PreemptionVictimFilter { new Function<PreemptionVictim, ResourceSlot>() { @Override public ResourceSlot apply(PreemptionVictim victim) { - return victim.getResourceSlot().withOverhead(executorSettings); + ResourceSlot slot = victim.getResourceSlot(); + if (tierManager.getTier(victim.getConfig()).isRevocable()) { + // Revocable task CPU cannot be used for preemption purposes as it's a compressible + // resource. We can still use RAM, DISK and PORTS as they are not compressible. + slot = new ResourceSlot(0.0, slot.getRam(), slot.getDisk(), slot.getNumPorts()); + } + return slot.withOverhead(executorSettings); } }; http://git-wip-us.apache.org/repos/asf/aurora/blob/f4446a61/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java index 66f20c6..8a1599a 100644 --- a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java +++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java @@ -20,6 +20,7 @@ import java.util.Set; import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.twitter.common.quantity.Amount; import com.twitter.common.quantity.Data; @@ -37,6 +38,8 @@ import org.apache.aurora.gen.TaskConfig; import org.apache.aurora.gen.TaskEvent; import org.apache.aurora.scheduler.HostOffer; import org.apache.aurora.scheduler.ResourceSlot; +import org.apache.aurora.scheduler.TierInfo; +import org.apache.aurora.scheduler.TierManager; import org.apache.aurora.scheduler.filter.SchedulingFilter; import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto; import org.apache.aurora.scheduler.filter.SchedulingFilterImpl; @@ -47,6 +50,7 @@ import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; import org.apache.aurora.scheduler.testing.FakeStatsProvider; +import org.apache.mesos.Protos; import org.easymock.EasyMock; import org.easymock.IAnswer; import org.easymock.IExpectationSetters; @@ -55,6 +59,7 @@ import org.junit.Test; import static org.apache.aurora.gen.MaintenanceMode.NONE; import static org.apache.aurora.gen.ScheduleStatus.RUNNING; +import static org.apache.aurora.scheduler.ResourceType.CPUS; import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY; import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.MISSING_ATTRIBUTES_NAME; import static org.apache.mesos.Protos.Offer; @@ -80,11 +85,13 @@ public class PreemptionVictimFilterTest extends EasyMockTest { private static final String HOST_ATTRIBUTE = "host"; private static final String OFFER = "offer"; private static final Optional<HostOffer> NO_OFFER = Optional.absent(); + private static final TierInfo DEFAULT_TIER = new TierInfo(false); private StorageTestUtil storageUtil; private SchedulingFilter schedulingFilter; private FakeStatsProvider statsProvider; private PreemptorMetrics preemptorMetrics; + private TierManager tierManager; @Before public void setUp() { @@ -92,6 +99,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest { storageUtil.expectOperations(); statsProvider = new FakeStatsProvider(); preemptorMetrics = new PreemptorMetrics(new CachedCounters(statsProvider)); + tierManager = createMock(TierManager.class); } private Optional<ImmutableSet<PreemptionVictim>> runFilter( @@ -103,7 +111,8 @@ public class PreemptionVictimFilterTest extends EasyMockTest { new PreemptionVictimFilter.PreemptionVictimFilterImpl( schedulingFilter, TaskExecutors.NO_OVERHEAD_EXECUTOR, - preemptorMetrics); + preemptorMetrics, + tierManager); return filter.filterPreemptionVictims( ITaskConfig.build(pendingTask.getAssignedTask().getTask()), @@ -118,6 +127,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest { setUpHost(); schedulingFilter = createMock(SchedulingFilter.class); + expect(tierManager.getTier(EasyMock.anyObject())).andReturn(DEFAULT_TIER); ScheduledTask lowPriority = makeTask(USER_A, JOB_A, TASK_ID_A); assignToHost(lowPriority); @@ -134,6 +144,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest { setUpHost(); schedulingFilter = createMock(SchedulingFilter.class); + expect(tierManager.getTier(EasyMock.anyObject())).andReturn(DEFAULT_TIER); ScheduledTask lowPriority = makeTask(USER_A, JOB_A, TASK_ID_A, 10); assignToHost(lowPriority); @@ -153,6 +164,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest { setUpHost(); schedulingFilter = createMock(SchedulingFilter.class); + expect(tierManager.getTier(EasyMock.anyObject())).andReturn(DEFAULT_TIER); ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_A, 100); assignToHost(highPriority); @@ -189,6 +201,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest { setUpHost(); schedulingFilter = createMock(SchedulingFilter.class); + expect(tierManager.getTier(EasyMock.anyObject())).andReturn(DEFAULT_TIER); // Use a very low priority for the production task to show that priority is irrelevant. ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", -1000); ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_B + "_a1", 100); @@ -205,6 +218,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest { setUpHost(); schedulingFilter = createMock(SchedulingFilter.class); + expect(tierManager.getTier(EasyMock.anyObject())).andReturn(DEFAULT_TIER); // Use a very low priority for the production task to show that priority is irrelevant. ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", -1000); ScheduledTask a1 = makeTask(USER_B, JOB_A, TASK_ID_B + "_a1", 100); @@ -231,6 +245,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest { @Test public void testProductionPreemptingManyNonProduction() throws Exception { schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR); + expect(tierManager.getTier(EasyMock.anyObject())).andReturn(DEFAULT_TIER).times(5); ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1"); a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); @@ -253,6 +268,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest { @Test public void testMinimalSetPreempted() throws Exception { schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR); + expect(tierManager.getTier(EasyMock.anyObject())).andReturn(DEFAULT_TIER).times(9); ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1"); a1.getAssignedTask().getTask().setNumCpus(4).setRamMb(4096); @@ -297,6 +313,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest { @Test public void testPreemptWithOfferAndTask() throws Exception { schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR); + expect(tierManager.getTier(EasyMock.anyObject())).andReturn(DEFAULT_TIER); setUpHost(); @@ -309,7 +326,78 @@ public class PreemptionVictimFilterTest extends EasyMockTest { control.replay(); assertVictims( - runFilter(p1, makeOffer(OFFER, 1, Amount.of(512L, Data.MB), Amount.of(1L, Data.MB), 1), a1), + runFilter( + p1, + makeOffer(OFFER, 1, Amount.of(512L, Data.MB), Amount.of(1L, Data.MB), 1, false), + a1), + a1); + } + + // Ensures revocable offer resources are filtered out. + @Test + public void testRevocableOfferFiltered() throws Exception { + schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR); + expect(tierManager.getTier(EasyMock.anyObject())).andReturn(DEFAULT_TIER); + + setUpHost(); + + ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1"); + a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); + assignToHost(a1); + + ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1"); + p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024); + + control.replay(); + assertNoVictims(runFilter( + p1, + makeOffer(OFFER, 1, Amount.of(512L, Data.MB), Amount.of(1L, Data.MB), 1, true), + a1)); + } + + // Ensures revocable task CPU is not considered for preemption. + @Test + public void testRevocableVictimsFiltered() throws Exception { + schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR); + expect(tierManager.getTier(EasyMock.anyObject())).andReturn(new TierInfo(true)); + + setUpHost(); + + ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1"); + a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); + assignToHost(a1); + + ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1"); + p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024); + + control.replay(); + assertNoVictims(runFilter( + p1, + makeOffer(OFFER, 1, Amount.of(512L, Data.MB), Amount.of(1L, Data.MB), 1, false), + a1)); + } + + // Ensures revocable victim non-compressible resources are still considered. + @Test + public void testRevocableVictimRamUsed() throws Exception { + schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR); + expect(tierManager.getTier(EasyMock.anyObject())).andReturn(new TierInfo(true)); + + setUpHost(); + + ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1"); + a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); + assignToHost(a1); + + ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1"); + p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024); + + control.replay(); + assertVictims( + runFilter( + p1, + makeOffer(OFFER, 2, Amount.of(512L, Data.MB), Amount.of(1L, Data.MB), 1, false), + a1), a1); } @@ -317,6 +405,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest { @Test public void testPreemptWithOfferAndMultipleTasks() throws Exception { schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR); + expect(tierManager.getTier(EasyMock.anyObject())).andReturn(DEFAULT_TIER).times(5); setUpHost(); @@ -333,7 +422,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest { control.replay(); Optional<HostOffer> offer = - makeOffer(OFFER, 2, Amount.of(1024L, Data.MB), Amount.of(1L, Data.MB), 1); + makeOffer(OFFER, 2, Amount.of(1024L, Data.MB), Amount.of(1L, Data.MB), 1, false); assertVictims(runFilter(p1, offer, a1, a2), a1, a2); } @@ -368,6 +457,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest { @Test public void testAllVictimsVetoed() { schedulingFilter = createMock(SchedulingFilter.class); + expect(tierManager.getTier(EasyMock.anyObject())).andReturn(DEFAULT_TIER); ScheduledTask task = makeProductionTask(USER_A, JOB_A, TASK_ID_A); assignToHost(task); @@ -410,17 +500,30 @@ public class PreemptionVictimFilterTest extends EasyMockTest { double cpu, Amount<Long, Data> ram, Amount<Long, Data> disk, - int numPorts) { + int numPorts, + boolean revocable) { List<Resource> resources = new ResourceSlot(cpu, ram, disk, numPorts).toResourceList(); + if (revocable) { + resources = ImmutableList.<Resource>builder() + .addAll(FluentIterable.from(resources) + .filter(e -> !e.getName().equals(CPUS.getName())) + .toList()) + .add(Protos.Resource.newBuilder() + .setName(CPUS.getName()) + .setType(Protos.Value.Type.SCALAR) + .setScalar(Protos.Value.Scalar.newBuilder().setValue(cpu)) + .setRevocable(Resource.RevocableInfo.newBuilder()) + .build()) + .build(); + } Offer.Builder builder = Offer.newBuilder(); builder.getIdBuilder().setValue(offerId); builder.getFrameworkIdBuilder().setValue("framework-id"); builder.getSlaveIdBuilder().setValue(SLAVE_ID); builder.setHostname(HOST); - for (Resource r: resources) { - builder.addResources(r); - } + builder.addAllResources(resources); + return Optional.of(new HostOffer( builder.build(), IHostAttributes.build(new HostAttributes().setMode(MaintenanceMode.NONE))));
