Repository: aurora Updated Branches: refs/heads/master 9b8868fb7 -> f5025f3c6
Adding TierManager initial implementation. Bugs closed: AURORA-1437 Reviewed at https://reviews.apache.org/r/37560/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/f5025f3c Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/f5025f3c Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/f5025f3c Branch: refs/heads/master Commit: f5025f3c607d81333d4a498ad3bb8fe3417ba392 Parents: 9b8868f Author: Maxim Khutornenko <[email protected]> Authored: Tue Aug 18 15:40:58 2015 -0700 Committer: Maxim Khutornenko <[email protected]> Committed: Tue Aug 18 15:40:58 2015 -0700 ---------------------------------------------------------------------- .../org/apache/aurora/scheduler/Resources.java | 26 ++++++++- .../aurora/scheduler/SchedulerModule.java | 4 ++ .../org/apache/aurora/scheduler/TierInfo.java | 61 ++++++++++++++++++++ .../apache/aurora/scheduler/TierManager.java | 41 +++++++++++++ .../aurora/scheduler/state/TaskAssigner.java | 14 ++++- .../apache/aurora/scheduler/ResourcesTest.java | 38 +++++++++--- .../aurora/scheduler/TierManagerTest.java | 30 ++++++++++ .../scheduler/state/TaskAssignerImplTest.java | 13 ++++- 8 files changed, 215 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/f5025f3c/src/main/java/org/apache/aurora/scheduler/Resources.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/Resources.java b/src/main/java/org/apache/aurora/scheduler/Resources.java index 40df262..3b4fbb6 100644 --- a/src/main/java/org/apache/aurora/scheduler/Resources.java +++ b/src/main/java/org/apache/aurora/scheduler/Resources.java @@ -17,8 +17,10 @@ import java.util.Collections; import java.util.List; import java.util.Set; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Predicate; +import com.google.common.base.Predicates; import com.google.common.collect.ContiguousSet; import com.google.common.collect.DiscreteDomain; import com.google.common.collect.ImmutableList; @@ -48,7 +50,19 @@ public final class Resources { /** * CPU resource filter. */ - public static final Predicate<Resource> CPU = e -> e.getName().equals(CPUS.getName()); + private static final Predicate<Resource> CPU = e -> e.getName().equals(CPUS.getName()); + + /** + * Revocable resource filter. + */ + @VisibleForTesting + static final Predicate<Resource> REVOCABLE = + Predicates.or(Predicates.not(CPU), Predicates.and(CPU, Resource::hasRevocable)); + + /** + * Non-revocable resource filter. + */ + private static final Predicate<Resource> NON_REVOCABLE = Predicates.not(Resource::hasRevocable); private final Iterable<Resource> mesosResources; @@ -77,6 +91,16 @@ public final class Resources { } /** + * Filters resources using the provided {@code tierInfo} instance. + * + * @param tierInfo Tier info. + * @return A new {@code Resources} object containing only filtered Mesos resources. + */ + public Resources filter(TierInfo tierInfo) { + return filter(tierInfo.isRevocable() ? REVOCABLE : NON_REVOCABLE); + } + + /** * Gets generalized aggregated resource view. * * @return {@code ResourceSlot} instance. http://git-wip-us.apache.org/repos/asf/aurora/blob/f5025f3c/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java index 45265ea..e8d53c7 100644 --- a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java +++ b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java @@ -31,6 +31,7 @@ import com.twitter.common.quantity.Time; import org.apache.aurora.scheduler.SchedulerLifecycle.LeadingOptions; import org.apache.aurora.scheduler.TaskIdGenerator.TaskIdGeneratorImpl; +import org.apache.aurora.scheduler.TierManager.TierManagerImpl; import org.apache.aurora.scheduler.base.AsyncUtil; import org.apache.aurora.scheduler.events.PubsubEventModule; import org.apache.mesos.Protos; @@ -92,6 +93,9 @@ public class SchedulerModule extends AbstractModule { bind(TaskStatusHandler.class).to(TaskStatusHandlerImpl.class); bind(TaskStatusHandlerImpl.class).in(Singleton.class); + + bind(TierManager.class).to(TierManagerImpl.class); + bind(TierManagerImpl.class).in(Singleton.class); addSchedulerActiveServiceBinding(binder()).to(TaskStatusHandlerImpl.class); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/f5025f3c/src/main/java/org/apache/aurora/scheduler/TierInfo.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/TierInfo.java b/src/main/java/org/apache/aurora/scheduler/TierInfo.java new file mode 100644 index 0000000..61bf30a --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/TierInfo.java @@ -0,0 +1,61 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler; + +import java.util.Objects; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Defines common task tier traits and behaviors. + */ +public class TierInfo { + private final boolean revocable; + + @VisibleForTesting + public TierInfo(boolean revocable) { + this.revocable = revocable; + } + + /** + * Checks if this tier intends to run with Mesos revocable resource offers. + * + * @return {@code true} if this tier requires revocable resource offers, {@code false} otherwise. + */ + public boolean isRevocable() { + return revocable; + } + + @Override + public int hashCode() { + return Objects.hashCode(revocable); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof TierInfo)) { + return false; + } + + TierInfo other = (TierInfo) obj; + return Objects.equals(revocable, other.revocable); + } + + @Override + public String toString() { + return com.google.common.base.Objects.toStringHelper(this) + .add("revocable", revocable) + .toString(); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/f5025f3c/src/main/java/org/apache/aurora/scheduler/TierManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/TierManager.java b/src/main/java/org/apache/aurora/scheduler/TierManager.java new file mode 100644 index 0000000..ebfad97 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/TierManager.java @@ -0,0 +1,41 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler; + +import org.apache.aurora.scheduler.storage.entities.ITaskConfig; + +/** + * Translates job tier configuration into a set of task traits/attributes. + * TODO(maxim): Implement external configuration support defined here: + * https://docs.google.com/document/d/1gexe2uM_9gjsV62cMmX0VjH85Uokko21vEoENY2jjF0 + */ +public interface TierManager { + + /** + * Gets {@link TierInfo} instance representing task's tier details. + * + * @param taskConfig Task configuration to get tier for. + * @return {@link TierInfo} for the given {@code taskConfig}. + */ + TierInfo getTier(ITaskConfig taskConfig); + + class TierManagerImpl implements TierManager { + + @Override + public TierInfo getTier(ITaskConfig taskConfig) { + // TODO(maxim): Implement when schema changes are defined. + return new TierInfo(false); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/f5025f3c/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java index ca4b5b0..2ab110e 100644 --- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java +++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java @@ -33,6 +33,8 @@ import com.twitter.common.stats.Stats; import org.apache.aurora.scheduler.HostOffer; import org.apache.aurora.scheduler.Resources; +import org.apache.aurora.scheduler.TierInfo; +import org.apache.aurora.scheduler.TierManager; import org.apache.aurora.scheduler.base.TaskGroupKey; import org.apache.aurora.scheduler.filter.SchedulingFilter; import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; @@ -86,18 +88,21 @@ public interface TaskAssigner { private final SchedulingFilter filter; private final MesosTaskFactory taskFactory; private final OfferManager offerManager; + private final TierManager tierManager; @Inject public TaskAssignerImpl( StateManager stateManager, SchedulingFilter filter, MesosTaskFactory taskFactory, - OfferManager offerManager) { + OfferManager offerManager, + TierManager tierManager) { this.stateManager = requireNonNull(stateManager); this.filter = requireNonNull(filter); this.taskFactory = requireNonNull(taskFactory); this.offerManager = requireNonNull(offerManager); + this.tierManager = requireNonNull(tierManager); } private TaskInfo assign( @@ -147,9 +152,14 @@ public interface TaskAssigner { // This slave is reserved for a different task group -> skip. continue; } + + TierInfo tierInfo = tierManager.getTier(groupKey.getTask()); Set<Veto> vetoes = filter.filter( - new UnusedResource(Resources.from(offer.getOffer()).slot(), offer.getAttributes()), + new UnusedResource( + Resources.from(offer.getOffer()).filter(tierInfo).slot(), + offer.getAttributes()), resourceRequest); + if (vetoes.isEmpty()) { TaskInfo taskInfo = assign( storeProvider, http://git-wip-us.apache.org/repos/asf/aurora/blob/f5025f3c/src/test/java/org/apache/aurora/scheduler/ResourcesTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/ResourcesTest.java b/src/test/java/org/apache/aurora/scheduler/ResourcesTest.java index a5878a4..c48d096 100644 --- a/src/test/java/org/apache/aurora/scheduler/ResourcesTest.java +++ b/src/test/java/org/apache/aurora/scheduler/ResourcesTest.java @@ -93,7 +93,7 @@ public class ResourcesTest { @Test public void testGetSlot() { ImmutableList<Resource> resources = ImmutableList.<Resource>builder() - .add(createCpuResource(8.0)) + .add(createCpuResource(8.0, false)) .add(createMemResource(1024, RAM_MB)) .add(createMemResource(2048, DISK_MB)) .add(createPortRange(Pair.of(1, 10))) @@ -112,13 +112,30 @@ public class ResourcesTest { @Test public void testFilter() { ImmutableList<Resource> resources = ImmutableList.<Resource>builder() - .add(createCpuResource(8.0)) + .add(createCpuResource(8.0, true)) .add(createMemResource(1024, RAM_MB)) .build(); assertEquals( - Resources.from(createOffer(createCpuResource(8.0))).slot(), - Resources.from(createOffer(resources)).filter(Resources.CPU).slot()); + new ResourceSlot(8.0, Amount.of(1024L, MB), Amount.of(0L, MB), 0), + Resources.from(createOffer(resources)).filter(Resources.REVOCABLE).slot()); + } + + @Test + public void testFilterByTier() { + ImmutableList<Resource> resources = ImmutableList.<Resource>builder() + .add(createCpuResource(8.0, true)) + .add(createCpuResource(8.0, false)) + .add(createMemResource(1024, RAM_MB)) + .build(); + + assertEquals( + new ResourceSlot(8.0, Amount.of(1024L, MB), Amount.of(0L, MB), 0), + Resources.from(createOffer(resources)).filter(new TierInfo(true)).slot()); + + assertEquals( + new ResourceSlot(8.0, Amount.of(1024L, MB), Amount.of(0L, MB), 0), + Resources.from(createOffer(resources)).filter(new TierInfo(false)).slot()); } private Resource createPortRange(Pair<Integer, Integer> range) { @@ -143,12 +160,17 @@ public class ResourcesTest { .build(); } - private static Resource createCpuResource(double cpus) { - return Resource.newBuilder() + private static Resource createCpuResource(double cpus, boolean revocable) { + Protos.Resource.Builder builder = Resource.newBuilder() .setName(CPUS.getName()) .setType(SCALAR) - .setScalar(Protos.Value.Scalar.newBuilder().setValue(cpus)) - .build(); + .setScalar(Protos.Value.Scalar.newBuilder().setValue(cpus)); + + if (revocable) { + builder.setRevocable(Resource.RevocableInfo.newBuilder().build()); + } + + return builder.build(); } private static Resource createMemResource(long mem, ResourceType resourceType) { http://git-wip-us.apache.org/repos/asf/aurora/blob/f5025f3c/src/test/java/org/apache/aurora/scheduler/TierManagerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/TierManagerTest.java b/src/test/java/org/apache/aurora/scheduler/TierManagerTest.java new file mode 100644 index 0000000..37e19ac --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/TierManagerTest.java @@ -0,0 +1,30 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler; + +import org.apache.aurora.gen.TaskConfig; +import org.apache.aurora.scheduler.TierManager.TierManagerImpl; +import org.apache.aurora.scheduler.storage.entities.ITaskConfig; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class TierManagerTest { + + @Test + public void testIsRevocable() { + TierInfo expected = new TierInfo(false); + assertEquals(expected, new TierManagerImpl().getTier(ITaskConfig.build(new TaskConfig()))); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/f5025f3c/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java index 88958d1..33054a6 100644 --- a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java @@ -28,6 +28,8 @@ import org.apache.aurora.gen.ScheduledTask; import org.apache.aurora.gen.TaskConfig; import org.apache.aurora.scheduler.HostOffer; import org.apache.aurora.scheduler.Resources; +import org.apache.aurora.scheduler.TierInfo; +import org.apache.aurora.scheduler.TierManager; import org.apache.aurora.scheduler.base.TaskGroupKey; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.filter.SchedulingFilter; @@ -100,6 +102,7 @@ public class TaskAssignerImplTest extends EasyMockTest { new UnusedResource(Resources.from(MESOS_OFFER).slot(), OFFER.getAttributes()); private static final ResourceRequest RESOURCE_REQUEST = new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY); + private static final TierInfo DEFAULT_TIER = new TierInfo(false); private MutableStoreProvider storeProvider; private StateManager stateManager; @@ -107,6 +110,7 @@ public class TaskAssignerImplTest extends EasyMockTest { private MesosTaskFactory taskFactory; private OfferManager offerManager; private TaskAssigner assigner; + private TierManager tierManager; @Before public void setUp() throws Exception { @@ -115,13 +119,15 @@ public class TaskAssignerImplTest extends EasyMockTest { taskFactory = createMock(MesosTaskFactory.class); stateManager = createMock(StateManager.class); offerManager = createMock(OfferManager.class); - assigner = new TaskAssignerImpl(stateManager, filter, taskFactory, offerManager); + tierManager = createMock(TierManager.class); + assigner = new TaskAssignerImpl(stateManager, filter, taskFactory, offerManager, tierManager); } @Test public void testAssignNoVetoes() throws Exception { expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER)); offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO); + expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEFAULT_TIER); expect(filter.filter(UNUSED, RESOURCE_REQUEST)).andReturn(ImmutableSet.of()); expect(stateManager.assignTask( storeProvider, @@ -147,6 +153,7 @@ public class TaskAssignerImplTest extends EasyMockTest { public void testAssignVetoesWithStaticBan() throws Exception { expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER)); offerManager.banOffer(MESOS_OFFER.getId(), GROUP_KEY); + expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEFAULT_TIER); expect(filter.filter(UNUSED, RESOURCE_REQUEST)) .andReturn(ImmutableSet.of(Veto.constraintMismatch("denied"))); @@ -163,6 +170,7 @@ public class TaskAssignerImplTest extends EasyMockTest { @Test public void testAssignVetoesWithNoStaticBan() throws Exception { expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER)); + expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEFAULT_TIER); expect(filter.filter(UNUSED, RESOURCE_REQUEST)) .andReturn(ImmutableSet.of(Veto.unsatisfiedLimit("limit"))); @@ -181,6 +189,7 @@ public class TaskAssignerImplTest extends EasyMockTest { expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER)); offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO); expectLastCall().andThrow(new OfferManager.LaunchException("expected")); + expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEFAULT_TIER); expect(filter.filter(UNUSED, RESOURCE_REQUEST)).andReturn(ImmutableSet.of()); expect(stateManager.assignTask( storeProvider, @@ -244,6 +253,7 @@ public class TaskAssignerImplTest extends EasyMockTest { IHostAttributes.build(new HostAttributes())); expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(offer, OFFER)); + expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEFAULT_TIER); expect(filter.filter(UNUSED, RESOURCE_REQUEST)).andReturn(ImmutableSet.of()); expect(stateManager.assignTask( storeProvider, @@ -284,6 +294,7 @@ public class TaskAssignerImplTest extends EasyMockTest { IHostAttributes.build(new HostAttributes())); expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(mismatched, OFFER)); + expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEFAULT_TIER).times(2); expect(filter.filter( new UnusedResource( Resources.from(mismatched.getOffer()).slot(),
